From 8d9816809f6f05fc13de047ee6662312977be5fc Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 31 Oct 2022 09:33:28 +0000 Subject: [PATCH] simplify queue rotation protocol (#550) * simplify queue rotation protocol * use simplified rotation protocol, update tests * simplify schema * delete all connection queues * refactor * switch notifications to the new queue, tests * remove TODO * refactor * rfc correction Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com> * remove duplicate set active Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com> --- rfcs/2022-08-14-queue-rotation.md | 19 +- src/Simplex/Messaging/Agent.hs | 314 +++++++++--------- src/Simplex/Messaging/Agent/Client.hs | 1 - .../Messaging/Agent/NtfSubSupervisor.hs | 93 +++--- src/Simplex/Messaging/Agent/Protocol.hs | 28 +- src/Simplex/Messaging/Agent/Store.hs | 24 +- src/Simplex/Messaging/Agent/Store/SQLite.hs | 133 ++++---- .../Migrations/M20220915_connection_queues.hs | 10 +- .../Store/SQLite/Migrations/agent_schema.sql | 5 +- src/Simplex/Messaging/Notifications/Types.hs | 4 + tests/AgentTests/FunctionalAPITests.hs | 91 +++-- tests/AgentTests/NotificationTests.hs | 31 +- tests/AgentTests/SQLiteTests.hs | 16 +- 13 files changed, 408 insertions(+), 361 deletions(-) diff --git a/rfcs/2022-08-14-queue-rotation.md b/rfcs/2022-08-14-queue-rotation.md index d31e8ec11..fa40fda7e 100644 --- a/rfcs/2022-08-14-queue-rotation.md +++ b/rfcs/2022-08-14-queue-rotation.md @@ -24,8 +24,6 @@ Additional agent messages required for the protocol: QKEY_ -> "QK" QUSE_ -> "QU" QTEST_ -> "QT" - QDEL_ -> "QD" - QEND_ -> "QE" `QADD`: add the new queue address(es), the same format as `REPLY` message, encoded as `QA`. @@ -33,11 +31,7 @@ Additional agent messages required for the protocol: `QUSE`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `QU`. -`QTEST`: send test message to the new connection, encoded as `QT`. Any other message can be sent if available to continue rotation, the absence of this message is not an error. - -`QDEL`: instruct the sender to stop using the previous queue, encoded as `QD` - -`QEND`: notify the recipient that no messages will be sent to this queue, encoded as `QE`. The recipient will delete this queue. +`QTEST`: send test message to the new connection, encoded as `QT`. Any other message can be sent if available to continue rotation, the absence of this message is not an error. Once this message is successfully sent the sender will stop using the old queue. Once this message (or any other message in the new queue) is received, the recipient will stop using the old queue and delete it. ### Protocol @@ -50,13 +44,10 @@ participant R' as Server that hosts the new A's receive queue A ->> R': create new queue A ->> S ->> B: QADD (R'): snd address of the new queue(s) -B ->> A(R) ->> A: QKEY (R'): sender's key for the new queue(s) (to avoid the race of SMP confirmation for the initial exchange) +B ->> S(R) ->> A: QKEY (R'): sender's key for the new queue(s) (to avoid the race of SMP confirmation for the initial exchange) A ->> S(R'): secure new queue A ->> S ->> B: QUSE (R'): instruction to use new queue(s) -B ->> A(R,R') ->> A: QTEST -B ->> A(R,R') ->> A: send all new messages to both queues -A ->> S ->> B: QDEL (R): instruction to delete the old queue -B ->> A(R') -> A: QEND (R): notification that no messages will be sent to the old queue -B ->> R' ->> A: send all new messages to the new queue only -A ->> S(R): DEL: delete the previous queue +B ->> S(R') ->> A: QTEST +A ->> S(R): DEL: delete the old queue +B ->> S(R') ->> A: send all new messages to the new queue ``` diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 20dc8ffcd..ae0436e87 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -348,7 +348,7 @@ newConnAsync :: forall m c. (AgentMonad m, ConnectionModeI c) => AgentClient -> newConnAsync c corrId enableNtfs cMode = do g <- asks idsDrg connAgentVersion <- asks $ maxVersion . smpAgentVRange . config - let cData = ConnData {connId = "", connAgentVersion, enableNtfs, duplexHandshake = Nothing} -- connection mode is determined by the accepting agent + let cData = ConnData {connId = "", connAgentVersion, enableNtfs, duplexHandshake = Nothing, deleted = False} -- connection mode is determined by the accepting agent connId <- withStore c $ \db -> createNewConn db g cData cMode enqueueCommand c corrId connId Nothing $ AClientCommand $ NEW enableNtfs (ACM cMode) pure connId @@ -360,7 +360,7 @@ joinConnAsync c corrId enableNtfs cReqUri@(CRInvitationUri (ConnReqUriData _ age Just (Compatible connAgentVersion) -> do g <- asks idsDrg let duplexHS = connAgentVersion /= 1 - cData = ConnData {connId = "", connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS} + cData = ConnData {connId = "", connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS, deleted = False} connId <- withStore c $ \db -> createNewConn db g cData SCMInvitation enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) cInfo pure connId @@ -402,20 +402,22 @@ ackMessageAsync' c corrId connId msgId = do enqueueCommand c corrId connId (Just server) . AClientCommand $ ACK msgId deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ACorrId -> ConnId -> m () -deleteConnectionAsync' c@AgentClient {subQ} corrId connId = - withStore c (`getConn` connId) >>= \case - -- TODO *** delete all queues - SomeConn _ (DuplexConnection _ (rq :| _) _) -> enqueueDelete rq - SomeConn _ (RcvConnection _ rq) -> enqueueDelete rq - SomeConn _ (ContactConnection _ rq) -> enqueueDelete rq - SomeConn _ (SndConnection _ _) -> withStore' c (`deleteConn` connId) >> notifyDeleted - SomeConn _ (NewConnection _) -> withStore' c (`deleteConn` connId) >> notifyDeleted +deleteConnectionAsync' c@AgentClient {subQ} corrId connId = withConnLock c connId "deleteConnectionAsync" $ do + SomeConn _ conn <- withStore c (`getConn` connId) + case conn of + DuplexConnection _ (rq :| _) _ -> enqueueDelete rq + RcvConnection _ rq -> enqueueDelete rq + ContactConnection _ rq -> enqueueDelete rq + SndConnection _ _ -> delete + NewConnection _ -> delete where enqueueDelete :: RcvQueue -> m () - enqueueDelete RcvQueue {server} = - enqueueCommand c corrId connId (Just server) $ AClientCommand DEL - notifyDeleted :: m () - notifyDeleted = atomically $ writeTBQueue subQ (corrId, connId, OK) + enqueueDelete RcvQueue {server} = do + withStore' c $ \db -> setConnDeleted db connId + disableConn c connId + enqueueCommand c corrId connId (Just server) $ AInternalCommand ICDeleteConn + delete :: m () + delete = withStore' c (`deleteConn` connId) >> atomically (writeTBQueue subQ (corrId, connId, OK)) -- | Add connection to the new receive queue switchConnectionAsync' :: AgentMonad m => AgentClient -> ACorrId -> ConnId -> m () @@ -451,7 +453,7 @@ newConnSrv c connId asyncMode enableNtfs cMode srv = do pure connId setUpConn False rq connAgentVersion = do g <- asks idsDrg - let cData = ConnData {connId, connAgentVersion, enableNtfs, duplexHandshake = Nothing} -- connection mode is determined by the accepting agent + let cData = ConnData {connId, connAgentVersion, enableNtfs, duplexHandshake = Nothing, deleted = False} -- connection mode is determined by the accepting agent withStore c $ \db -> createRcvConn db g cData rq cMode joinConn :: AgentMonad m => AgentClient -> ConnId -> Bool -> Bool -> ConnectionRequestUri c -> ConnInfo -> m ConnId @@ -475,7 +477,7 @@ joinConnSrv c connId asyncMode enableNtfs (CRInvitationUri (ConnReqUriData _ age let rc = CR.initSndRatchet e2eEncryptVRange rcDHRr rcDHRs $ CR.x3dhSnd pk1 pk2 e2eRcvParams q <- newSndQueue "" qInfo let duplexHS = connAgentVersion /= 1 - cData = ConnData {connId, connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS} + cData = ConnData {connId, connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS, deleted = False} connId' <- setUpConn asyncMode cData q rc let sq = (q :: SndQueue) {connId = connId'} cData' = (cData :: ConnData) {connId = connId'} @@ -558,23 +560,23 @@ rejectContact' c contactConnId invId = -- | Subscribe to receive connection messages (SUB command) in Reader monad subscribeConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () -subscribeConnection' c connId = - withStore c (`getConn` connId) >>= \conn -> do - resumeConnCmds c connId - case conn of - SomeConn _ (DuplexConnection cData (rq :| rqs) sqs) -> do - mapM_ (resumeMsgDelivery c cData) sqs - subscribe cData rq - mapM_ (\q -> subscribeQueue c q `catchError` \_ -> pure ()) rqs - SomeConn _ (SndConnection cData sq) -> do - resumeMsgDelivery c cData sq - case status (sq :: SndQueue) of - Confirmed -> pure () - Active -> throwError $ CONN SIMPLEX - _ -> throwError $ INTERNAL "unexpected queue status" - SomeConn _ (RcvConnection cData rq) -> subscribe cData rq - SomeConn _ (ContactConnection cData rq) -> subscribe cData rq - SomeConn _ (NewConnection _) -> pure () +subscribeConnection' c connId = do + SomeConn _ conn <- withStore c (`getConn` connId) + resumeConnCmds c connId + case conn of + DuplexConnection cData (rq :| rqs) sqs -> do + mapM_ (resumeMsgDelivery c cData) sqs + subscribe cData rq + mapM_ (\q -> subscribeQueue c q `catchError` \_ -> pure ()) rqs + SndConnection cData sq -> do + resumeMsgDelivery c cData sq + case status (sq :: SndQueue) of + Confirmed -> pure () + Active -> throwError $ CONN SIMPLEX + _ -> throwError $ INTERNAL "unexpected queue status" + RcvConnection cData rq -> subscribe cData rq + ContactConnection cData rq -> subscribe cData rq + NewConnection _ -> pure () where subscribe :: ConnData -> RcvQueue -> m () subscribe ConnData {enableNtfs} rq = do @@ -603,12 +605,12 @@ subscribeConnections' c connIds = do pure rs where rcvQueueOrResult :: SomeConn -> Either (Either AgentErrorType ()) (NonEmpty RcvQueue) - rcvQueueOrResult = \case - SomeConn _ (DuplexConnection _ rqs _) -> Right rqs - SomeConn _ (SndConnection _ sq) -> Left $ sndSubResult sq - SomeConn _ (RcvConnection _ rq) -> Right [rq] - SomeConn _ (ContactConnection _ rq) -> Right [rq] - SomeConn _ (NewConnection _) -> Left (Right ()) + rcvQueueOrResult (SomeConn _ conn) = case conn of + DuplexConnection _ rqs _ -> Right rqs + SndConnection _ sq -> Left $ sndSubResult sq + RcvConnection _ rq -> Right [rq] + ContactConnection _ rq -> Right [rq] + NewConnection _ -> Left (Right ()) sndSubResult :: SndQueue -> Either AgentErrorType () sndSubResult sq = case status (sq :: SndQueue) of Confirmed -> Right () @@ -643,9 +645,9 @@ subscribeConnections' c connIds = do _ -> pure () _ -> pure () sndQueue :: SomeConn -> Maybe (ConnData, NonEmpty SndQueue) - sndQueue = \case - SomeConn _ (DuplexConnection cData _ sqs) -> Just (cData, sqs) - SomeConn _ (SndConnection cData sq) -> Just (cData, [sq]) + sndQueue (SomeConn _ conn) = case conn of + DuplexConnection cData _ sqs -> Just (cData, sqs) + SndConnection cData sq -> Just (cData, [sq]) _ -> Nothing notifyResultError :: Map ConnId (Either AgentErrorType ()) -> m () notifyResultError rs = do @@ -671,12 +673,13 @@ resubscribeConnections' c connIds = do getConnectionMessage' :: AgentMonad m => AgentClient -> ConnId -> m (Maybe SMPMsgMeta) getConnectionMessage' c connId = do whenM (atomically $ hasActiveSubscription c connId) . throwError $ CMD PROHIBITED - withStore c (`getConn` connId) >>= \case - SomeConn _ (DuplexConnection _ (rq :| _) _) -> getQueueMessage c rq - SomeConn _ (RcvConnection _ rq) -> getQueueMessage c rq - SomeConn _ (ContactConnection _ rq) -> getQueueMessage c rq - SomeConn _ SndConnection {} -> throwError $ CONN SIMPLEX - SomeConn _ NewConnection {} -> throwError $ CMD PROHIBITED + SomeConn _ conn <- withStore c (`getConn` connId) + case conn of + DuplexConnection _ (rq :| _) _ -> getQueueMessage c rq + RcvConnection _ rq -> getQueueMessage c rq + ContactConnection _ rq -> getQueueMessage c rq + SndConnection _ _ -> throwError $ CONN SIMPLEX + NewConnection _ -> throwError $ CMD PROHIBITED getNotificationMessage' :: forall m. AgentMonad m => AgentClient -> C.CbNonce -> ByteString -> m (NotificationInfo, [SMPMsgMeta]) getNotificationMessage' c nonce encNtfInfo = do @@ -708,9 +711,10 @@ getNotificationMessage' c nonce encNtfInfo = do -- | Send message to the connection (SEND command) in Reader monad sendMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> MsgFlags -> MsgBody -> m AgentMsgId sendMessage' c connId msgFlags msg = withConnLock c connId "sendMessage" $ do - withStore c (`getConn` connId) >>= \case - SomeConn _ (DuplexConnection cData _ sqs) -> enqueueMsgs cData sqs - SomeConn _ (SndConnection cData sq) -> enqueueMsgs cData [sq] + SomeConn _ conn <- withStore c (`getConn` connId) + case conn of + DuplexConnection cData _ sqs -> enqueueMsgs cData sqs + SndConnection cData sq -> enqueueMsgs cData [sq] _ -> throwError $ CONN SIMPLEX where enqueueMsgs :: ConnData -> NonEmpty SndQueue -> m AgentMsgId @@ -805,6 +809,23 @@ runCommandProcessing c@AgentClient {subQ} server_ = do secure rq senderKey when (duplexHandshake cData == Just True) . void $ enqueueMessage c cData sq SMP.MsgFlags {notification = True} HELLO + ICDeleteConn -> + withServer $ \srv -> tryWithLock "ICDeleteConn" $ do + SomeConn _ conn <- withStore c $ \db -> getAnyConn db connId True + case conn of + DuplexConnection _ (rq :| rqs) _ -> delete srv rq $ case rqs of + [] -> notify OK + RcvQueue {server = srv'} : _ -> enqueue srv' + RcvConnection _ rq -> delete srv rq $ notify OK + ContactConnection _ rq -> delete srv rq $ notify OK + _ -> internalErr "command requires connection with rcv queue" + where + delete :: SMPServer -> RcvQueue -> m () -> m () + delete srv rq@RcvQueue {server} next + | sameSrvAddr srv server = deleteConnQueue c rq >> next + | otherwise = enqueue server + enqueue :: SMPServer -> m () + enqueue srv = enqueueCommand c corrId connId (Just srv) $ AInternalCommand ICDeleteConn ICQSecure rId senderKey -> withServer $ \srv -> tryWithLock "ICQSecure" . withDuplexConn $ \(DuplexConnection cData rqs sqs) -> case find (sameQueue (srv, rId)) rqs of @@ -822,6 +843,9 @@ runCommandProcessing c@AgentClient {subQ} server_ = do | otherwise -> do deleteQueue c rq' withStore' c $ \db -> deleteConnRcvQueue db connId rq' + when (enableNtfs cData) $ do + ns <- asks ntfSupervisor + atomically $ sendNtfSubCommand ns (connId, NSCCreate) let conn' = DuplexConnection cData (rq'' :| rqs') sqs notify $ SWITCH SPCompleted $ connectionStats conn' _ -> internalErr "ICQDelete: cannot delete the only queue in connection" @@ -957,7 +981,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh _ -> sendAgentMessage c sq msgFlags msgBody case resp of Left e -> do - let err = if msgType == AM_CONN_INFO then ERR e else MERR mId e + let err = if msgType == AM_A_MSG_ then MERR mId e else ERR e case e of SMP SMP.QUOTA -> case msgType of AM_CONN_INFO -> connError msgId NOT_AVAILABLE @@ -978,14 +1002,12 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh Just _ -> connError msgId NOT_AVAILABLE -- party joining connection _ -> connError msgId NOT_ACCEPTED - AM_REPLY_ -> notifyDel msgId $ ERR e - AM_A_MSG_ -> notifyDel msgId $ MERR mId e - AM_QADD_ -> pure () - AM_QKEY_ -> pure () - AM_QUSE_ -> pure () - AM_QTEST_ -> pure () - AM_QDEL_ -> pure () - AM_QEND_ -> pure () + AM_REPLY_ -> notifyDel msgId err + AM_A_MSG_ -> notifyDel msgId err + AM_QADD_ -> qError msgId "QADD: AUTH" + AM_QKEY_ -> qError msgId "QKEY: AUTH" + AM_QUSE_ -> qError msgId "QUSE: AUTH" + AM_QTEST_ -> qError msgId "QTEST: AUTH" _ -- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server), -- the message sending would be retried @@ -1034,11 +1056,30 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh AM_QADD_ -> pure () AM_QKEY_ -> pure () AM_QUSE_ -> pure () - AM_QTEST_ -> + AM_QTEST_ -> do withStore' c $ \db -> setSndQueueStatus db sq Active - AM_QDEL_ -> pure () - AM_QEND_ -> - getConnectionServers' c connId >>= notify . SWITCH SPCompleted + SomeConn _ conn <- withStore c (`getConn` connId) + case conn of + DuplexConnection cData' rqs sqs -> do + -- remove old snd queue from connection once QTEST is sent to the new queue + case findQ (qAddress sq) sqs of + -- this is the same queue where this loop delivers messages to but with updated state + Just SndQueue {dbReplaceQueueId = Just replacedId, primary} -> + case removeQP (\SndQueue {dbQueueId} -> dbQueueId == replacedId) sqs of + Nothing -> internalErr msgId "sent QTEST: queue not found in connection" + Just (sq', sq'' : sqs') -> do + -- remove the delivery from the map to stop the thread when the delivery loop is complete + atomically $ TM.delete (qAddress sq') $ smpQueueMsgQueues c + withStore' c $ \db -> do + when primary $ setSndQueuePrimary db connId sq' + deletePendingMsgs db connId sq' + deleteConnSndQueue db connId sq' + let sqs'' = sq'' :| sqs' + conn' = DuplexConnection cData' rqs sqs'' + notify . SWITCH SPCompleted $ connectionStats conn' + _ -> internalErr msgId "sent QTEST: there is only one queue in connection" + _ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue" + _ -> internalErr msgId "QTEST sent not in duplex connection" delMsg msgId where delMsg :: InternalId -> m () @@ -1048,6 +1089,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh notifyDel :: InternalId -> ACommand 'Agent -> m () notifyDel msgId cmd = notify cmd >> delMsg msgId connError msgId = notifyDel msgId . ERR . CONN + qError msgId = notifyDel msgId . ERR . AGENT . A_QUEUE + internalErr msgId = notifyDel msgId . ERR . INTERNAL retrySndOp :: AgentMonad m => AgentClient -> m () -> m () retrySndOp c loop = do @@ -1084,7 +1127,7 @@ switchConnection' c connId = withConnLock c connId "switchConnection" $ do srv <- getNextSMPServer c $ map qServer (L.toList rqs) <> map qServer (L.toList sqs) srv' <- if srv == server then getNextSMPServer c [server] else pure srv (q, qUri) <- newRcvQueue c connId srv' clientVRange - let rq' = (q :: RcvQueue) {primary = False, nextPrimary = True, dbReplaceQueueId = Just dbQueueId} + let rq' = (q :: RcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId} void . withStore c $ \db -> addConnRcvQueue db connId rq' addSubscription c rq' void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))] @@ -1100,30 +1143,39 @@ ackQueueMessage c rq srvMsgId = -- | Suspend SMP agent connection (OFF command) in Reader monad suspendConnection' :: AgentMonad m => AgentClient -> ConnId -> m () suspendConnection' c connId = withConnLock c connId "suspendConnection" $ do - withStore c (`getConn` connId) >>= \case - SomeConn _ (DuplexConnection _ rqs _) -> mapM_ (suspendQueue c) rqs - SomeConn _ (RcvConnection _ rq) -> suspendQueue c rq - SomeConn _ (ContactConnection _ rq) -> suspendQueue c rq - SomeConn _ (SndConnection _ _) -> throwError $ CONN SIMPLEX - SomeConn _ (NewConnection _) -> throwError $ CMD PROHIBITED + SomeConn _ conn <- withStore c (`getConn` connId) + case conn of + DuplexConnection _ rqs _ -> mapM_ (suspendQueue c) rqs + RcvConnection _ rq -> suspendQueue c rq + ContactConnection _ rq -> suspendQueue c rq + SndConnection _ _ -> throwError $ CONN SIMPLEX + NewConnection _ -> throwError $ CMD PROHIBITED -- | Delete SMP agent connection (DEL command) in Reader monad deleteConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m () deleteConnection' c connId = withConnLock c connId "deleteConnection" $ do - withStore c (`getConn` connId) >>= \case - SomeConn _ (DuplexConnection _ rqs _) -> mapM_ delete rqs - SomeConn _ (RcvConnection _ rq) -> delete rq - SomeConn _ (ContactConnection _ rq) -> delete rq - SomeConn _ (SndConnection _ _) -> withStore' c (`deleteConn` connId) - SomeConn _ (NewConnection _) -> withStore' c (`deleteConn` connId) + SomeConn _ conn <- withStore c (`getConn` connId) + case conn of + DuplexConnection _ rqs _ -> mapM_ (deleteConnQueue c) rqs >> disableConn c connId >> deleteConn' + RcvConnection _ rq -> delete rq + ContactConnection _ rq -> delete rq + SndConnection _ _ -> deleteConn' + NewConnection _ -> deleteConn' where delete :: RcvQueue -> m () - delete rq = do - deleteQueue c rq - atomically $ removeSubscription c connId - withStore' c (`deleteConn` connId) - ns <- asks ntfSupervisor - atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete) + delete rq = deleteConnQueue c rq >> disableConn c connId >> deleteConn' + deleteConn' = withStore' c (`deleteConn` connId) + +deleteConnQueue :: AgentMonad m => AgentClient -> RcvQueue -> m () +deleteConnQueue c rq@RcvQueue {connId} = do + deleteQueue c rq + withStore' c $ \db -> deleteConnRcvQueue db connId rq + +disableConn :: AgentMonad m => AgentClient -> ConnId -> m () +disableConn c connId = do + atomically $ removeSubscription c connId + ns <- asks ntfSupervisor + atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete) getConnectionServers' :: AgentMonad m => AgentClient -> ConnId -> m ConnectionStats getConnectionServers' c connId = do @@ -1268,10 +1320,11 @@ getNtfTokenData' c = -- | Set connection notifications, in Reader monad toggleConnectionNtfs' :: forall m. AgentMonad m => AgentClient -> ConnId -> Bool -> m () toggleConnectionNtfs' c connId enable = do - withStore c (`getConn` connId) >>= \case - SomeConn _ (DuplexConnection cData _ _) -> toggle cData - SomeConn _ (RcvConnection cData _) -> toggle cData - SomeConn _ (ContactConnection cData _) -> toggle cData + SomeConn _ conn <- withStore c (`getConn` connId) + case conn of + DuplexConnection cData _ _ -> toggle cData + RcvConnection cData _ -> toggle cData + ContactConnection cData _ -> toggle cData _ -> throwError $ CONN SIMPLEX where toggle :: ConnData -> m () @@ -1409,18 +1462,12 @@ subscriber c@AgentClient {msgQ} = forever $ do Right _ -> return () processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission BrokerMsg -> m () -processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cmd) = - withStore c (\db -> getRcvConn db srv rId) >>= \case - -- TODO *** get queue separately? - SomeConn _ conn@(DuplexConnection cData rqs _) -> case find (sameQueue (srv, rId)) rqs of - Just rq -> processSMP conn cData rq - _ -> atomically $ writeTBQueue subQ ("", "", ERR $ CONN NOT_FOUND) - SomeConn _ conn@(RcvConnection cData rq) -> processSMP conn cData rq - SomeConn _ conn@(ContactConnection cData rq) -> processSMP conn cData rq - _ -> atomically $ writeTBQueue subQ ("", "", ERR $ CONN NOT_FOUND) +processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cmd) = do + (rq, SomeConn _ conn) <- withStore c (\db -> getRcvConn db srv rId) + processSMP rq conn $ connData conn where - processSMP :: Connection c -> ConnData -> RcvQueue -> m () - processSMP conn cData@ConnData {connId, duplexHandshake} rq@RcvQueue {e2ePrivKey, e2eDhSecret, status} = withConnLock c connId "processSMP" $ + processSMP :: RcvQueue -> Connection c -> ConnData -> m () + processSMP rq@RcvQueue {e2ePrivKey, e2eDhSecret, status} conn cData@ConnData {connId, duplexHandshake} = withConnLock c connId "processSMP" $ case cmd of SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} -> handleNotifyAck $ do SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} <- decryptSMPMessage v rq msg @@ -1441,17 +1488,15 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm decryptClientMessage e2eDh clientMsg >>= \case (SMP.PHEmpty, AgentMsgEnvelope _ encAgentMsg) -> do -- primary queue is set as Active in helloMsg, below is to set additional queues Active - let RcvQueue {primary, nextPrimary, dbReplaceQueueId} = rq - unless primary . withStore' c $ \db -> do - unless (status == Active) $ setRcvQueueStatus db rq Active - when nextPrimary $ setRcvQueuePrimary db connId rq + let RcvQueue {primary, dbReplaceQueueId} = rq + unless (status == Active) . withStore' c $ \db -> setRcvQueueStatus db rq Active case (conn, dbReplaceQueueId) of - (DuplexConnection _ rqs sqs, Just dbRcvId) -> - case find (\RcvQueue {dbQueueId} -> dbQueueId == dbRcvId) rqs of - Just RcvQueue {server, sndId} -> do - void . enqueueMessages c cData sqs SMP.noMsgFlags $ QDEL [(server, sndId)] - notify . SWITCH SPTested $ connectionStats conn - _ -> throwError $ INTERNAL "replaced RcvQueue not found in connection" + (DuplexConnection _ rqs _, Just replacedId) -> do + when primary . withStore' c $ \db -> setRcvQueuePrimary db connId rq + case find (\RcvQueue {dbQueueId} -> dbQueueId == replacedId) rqs of + Just RcvQueue {server, rcvId} -> do + enqueueCommand c "" connId (Just server) $ AInternalCommand $ ICQDelete rcvId + _ -> notify . ERR . AGENT $ A_QUEUE "replaced RcvQueue not found in connection" _ -> pure () tryError agentClientMsg >>= \case Right (Just (msgId, msgMeta, aMessage)) -> case aMessage of @@ -1467,8 +1512,6 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm -- no action needed for QTEST -- any message in the new queue will mark it active and trigger deletion of the old queue QTEST _ -> logServer "<--" c srv rId "MSG " >> ackDel msgId - QDEL qs -> qDuplex "QDEL" $ qDelMsg qs - QEND qs -> qDuplex "QEND" $ qEndMsg qs where qDuplex :: String -> (Connection 'CDuplex -> m ()) -> m () qDuplex name a = case conn of @@ -1606,8 +1649,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm logServer "<--" c srv rId "MSG " case status of Active -> prohibited - _ -> do - withStore' c $ \db -> setRcvQueueStatus db rq Active + _ -> case conn of DuplexConnection _ _ (sq@SndQueue {status = sndStatus} :| _) -- `sndStatus == Active` when HELLO was previously sent, and this is the reply HELLO @@ -1644,7 +1686,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm (Just _, _) -> qError "QADD: queue address is already used in connection" (_, Just _replaced@SndQueue {dbQueueId}) -> do sq_@SndQueue {sndPublicKey, e2ePubKey} <- newSndQueue connId qInfo - let sq' = (sq_ :: SndQueue) {nextPrimary = True, dbReplaceQueueId = Just dbQueueId} + let sq' = (sq_ :: SndQueue) {primary = True, dbReplaceQueueId = Just dbQueueId} void . withStore c $ \db -> addConnSndQueue db connId sq' case (sndPublicKey, e2ePubKey) of (Just sndPubKey, Just dhPublicKey) -> do @@ -1678,47 +1720,18 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm -- processed by queue sender -- mark queue as Secured and to start sending messages to it qUseMsg :: NonEmpty ((SMPServer, SMP.SenderId), Bool) -> Connection 'CDuplex -> m () - qUseMsg ((addr, primary) :| _) (DuplexConnection _ _ sqs) = - case removeQ addr sqs of - Just (sq', sqs') -> do + -- NOTE: does not yet support the change of the primary status during the rotation + qUseMsg ((addr, _primary) :| _) (DuplexConnection _ _ sqs) = + case findQ addr sqs of + Just sq' -> do logServer "<--" c srv rId $ "MSG " <> logSecret (snd addr) - withStore' c $ \db -> do - setSndQueueStatus db sq' Secured - when primary $ setSndQueuePrimary db connId sq' - let sq'' = (sq' :: SndQueue) {status = Secured, primary} - void $ enqueueMessages c cData (sq'' :| sqs') SMP.noMsgFlags $ QTEST [addr] + withStore' c $ \db -> setSndQueueStatus db sq' Secured + let sq'' = (sq' :: SndQueue) {status = Secured} + -- sending QTEST to the new queue only, the old one will be removed if sent successfully + void $ enqueueMessages c cData [sq''] SMP.noMsgFlags $ QTEST [addr] notify . SWITCH SPConfirmed $ connectionStats conn _ -> qError "QUSE: queue address not found in connection" - -- processed by queue sender - -- remove snd queue from connection and enqueue QEND message - qDelMsg :: NonEmpty (SMPServer, SMP.SenderId) -> Connection 'CDuplex -> m () - qDelMsg (addr :| _) (DuplexConnection _ rqs sqs) = - case removeQ addr sqs of - Nothing -> logServer "<--" c srv rId "MSG : queue not found (already deleted?)" - Just (sq, sq' : sqs') -> do - logServer "<--" c srv rId $ "MSG " <> logSecret (snd addr) - -- remove the delivery from the map to stop the thread when the delivery loop is complete - atomically $ TM.delete addr $ smpQueueMsgQueues c - withStore' c $ \db -> do - deletePendingMsgs db connId sq - deleteConnSndQueue db connId sq - let sqs'' = sq' :| sqs' - conn' = DuplexConnection cData rqs sqs'' - void $ enqueueMessages c cData sqs'' SMP.noMsgFlags $ QEND [addr] - notify . SWITCH SPTested $ connectionStats conn' - _ -> qError "QDEL received to the only queue in connection" - - -- received by party initiating switch - -- TODO *** check that the received address matches expectations - qEndMsg :: NonEmpty (SMPServer, SMP.SenderId) -> Connection 'CDuplex -> m () - qEndMsg (addr@(smpServer, senderId) :| _) (DuplexConnection _ rqs _) = - case findRQ addr rqs of - Just RcvQueue {rcvId} -> do - logServer "<--" c srv rId $ "MSG " <> logSecret senderId - enqueueCommand c "" connId (Just smpServer) $ AInternalCommand $ ICQDelete rcvId - _ -> qError "QEND: queue address not found in connection" - qError :: String -> m () qError = throwError . AGENT . A_QUEUE @@ -1835,7 +1848,6 @@ newSndQueue_ a connId (Compatible (SMPQueueInfo smpClientVersion SMPQueueAddress status = New, dbQueueId = 0, primary = True, - nextPrimary = False, dbReplaceQueueId = Nothing, smpClientVersion } diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index d336ea41c..cf11f7ead 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -549,7 +549,6 @@ newRcvQueue_ a c connId srv vRange = do status = New, dbQueueId = 0, primary = True, - nextPrimary = False, dbReplaceQueueId = Nothing, smpClientVersion = maxVersion vRange, clientNtfCreds = Nothing diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 7476e85c7..d44ed300d 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -82,43 +82,48 @@ processNtfSub c (connId, cmd) = do case clientNtfCreds of Just ClientNtfCreds {notifierId} -> do let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey - ts <- liftIO getCurrentTime - withStore' c $ \db -> createNtfSubscription db newSub (NtfSubNTFAction NSACreate) ts + withStore' c $ \db -> createNtfSubscription db newSub $ NtfSubNTFAction NSACreate addNtfNTFWorker ntfServer Nothing -> do let newSub = newNtfSubscription connId smpServer Nothing ntfServer NASNew - ts <- liftIO getCurrentTime - withStore' c $ \db -> createNtfSubscription db newSub (NtfSubSMPAction NSASmpKey) ts + withStore' c $ \db -> createNtfSubscription db newSub $ NtfSubSMPAction NSASmpKey addNtfSMPWorker smpServer - (Just (sub@NtfSubscription {ntfSubStatus, ntfServer = subNtfServer}, action_)) -> do - case action_ of - -- action was set to NULL after worker internal error - Nothing -> resetSubscription - Just (action, _) - -- subscription was marked for deletion / is being deleted - | isDeleteNtfSubAction action -> do - if ntfSubStatus == NASNew || ntfSubStatus == NASOff || ntfSubStatus == NASDeleted - then resetSubscription - else withNtfServer c $ \ntfServer -> do - ts <- liftIO getCurrentTime - withStore' c $ \db -> - supervisorUpdateNtfSubscription db sub {ntfServer} (NtfSubNTFAction NSACreate) ts - addNtfNTFWorker ntfServer - | otherwise -> case action of - NtfSubNTFAction _ -> addNtfNTFWorker subNtfServer - NtfSubSMPAction _ -> addNtfSMPWorker smpServer + (Just (sub@NtfSubscription {ntfSubStatus, ntfServer = subNtfServer, smpServer = smpServer', ntfQueueId}, action_)) -> do + case (clientNtfCreds, ntfQueueId) of + (Just ClientNtfCreds {notifierId}, Just ntfQueueId') + | sameSrvAddr smpServer smpServer' && notifierId == ntfQueueId' -> create + | otherwise -> rotate + (Nothing, Nothing) -> create + _ -> rotate where + create :: m () + create = case action_ of + -- action was set to NULL after worker internal error + Nothing -> resetSubscription + Just (action, _) + -- subscription was marked for deletion / is being deleted + | isDeleteNtfSubAction action -> do + if ntfSubStatus == NASNew || ntfSubStatus == NASOff || ntfSubStatus == NASDeleted + then resetSubscription + else withNtfServer c $ \ntfServer -> do + withStore' c $ \db -> supervisorUpdateNtfSub db sub {ntfServer} (NtfSubNTFAction NSACreate) + addNtfNTFWorker ntfServer + | otherwise -> case action of + NtfSubNTFAction _ -> addNtfNTFWorker subNtfServer + NtfSubSMPAction _ -> addNtfSMPWorker smpServer + rotate :: m () + rotate = do + withStore' c $ \db -> supervisorUpdateNtfSub db sub (NtfSubNTFAction NSARotate) + addNtfNTFWorker subNtfServer resetSubscription :: m () resetSubscription = withNtfServer c $ \ntfServer -> do - ts <- liftIO getCurrentTime - withStore' c $ \db -> - supervisorUpdateNtfSubscription db sub {ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew} (NtfSubSMPAction NSASmpKey) ts + let sub' = sub {ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew} + withStore' c $ \db -> supervisorUpdateNtfSub db sub' (NtfSubSMPAction NSASmpKey) addNtfSMPWorker smpServer NSCDelete -> do sub_ <- withStore' c $ \db -> do - ts <- liftIO getCurrentTime - supervisorUpdateNtfSubAction db connId (NtfSubNTFAction NSADelete) ts + supervisorUpdateNtfAction db connId (NtfSubNTFAction NSADelete) getNtfSubscription db connId logInfo $ "processNtfSub, NSCDelete - sub_ = " <> tshow sub_ case sub_ of @@ -128,14 +133,11 @@ processNtfSub c (connId, cmd) = do withStore' c (`getPrimaryRcvQueue` connId) >>= \case Right rq@RcvQueue {server = smpServer} -> do logInfo $ "processNtfSub, NSCSmpDelete - rq = " <> tshow rq - ts <- liftIO getCurrentTime - withStore' c $ \db -> supervisorUpdateNtfSubAction db connId (NtfSubSMPAction NSASmpDelete) ts + withStore' c $ \db -> supervisorUpdateNtfAction db connId (NtfSubSMPAction NSASmpDelete) addNtfSMPWorker smpServer _ -> notifyInternalError c connId "NSCSmpDelete - no rcv queue" - NSCNtfWorker ntfServer -> - addNtfNTFWorker ntfServer - NSCNtfSMPWorker smpServer -> - addNtfSMPWorker smpServer + NSCNtfWorker ntfServer -> addNtfNTFWorker ntfServer + NSCNtfSMPWorker smpServer -> addNtfSMPWorker smpServer where addNtfNTFWorker = addWorker ntfWorkers runNtfWorker addNtfSMPWorker = addWorker ntfSMPWorkers runNtfSMPWorker @@ -214,16 +216,25 @@ runNtfWorker c srv doWork = do _ -> workerInternalError c connId "NSACheck - no active token" NSADelete -> case ntfSubId of Just nSubId -> - (getNtfToken >>= \tkn -> forM_ tkn $ agentNtfDeleteSubscription c nSubId) - `E.finally` carryOnWithDeletion - Nothing -> carryOnWithDeletion + (getNtfToken >>= mapM_ (agentNtfDeleteSubscription c nSubId)) + `E.finally` continueDeletion + _ -> continueDeletion where - carryOnWithDeletion :: m () - carryOnWithDeletion = do - withStore' c $ \db -> - updateNtfSubscription db sub {ntfSubId = Nothing, ntfSubStatus = NASOff} (NtfSubSMPAction NSASmpDelete) ts + continueDeletion = do + let sub' = sub {ntfSubId = Nothing, ntfSubStatus = NASOff} + withStore' c $ \db -> updateNtfSubscription db sub' (NtfSubSMPAction NSASmpDelete) ts ns <- asks ntfSupervisor atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCNtfSMPWorker smpServer) + NSARotate -> case ntfSubId of + Just nSubId -> + (getNtfToken >>= mapM_ (agentNtfDeleteSubscription c nSubId)) + `E.finally` deleteCreate + _ -> deleteCreate + where + deleteCreate = do + withStore' c $ \db -> deleteNtfSubscription db connId + ns <- asks ntfSupervisor + atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCCreate) where updateSubNextCheck ts toStatus = do checkInterval <- asks $ ntfSubCheckInterval . config @@ -276,7 +287,7 @@ runNtfSMPWorker c srv doWork = do rq_ <- withStore' c $ \db -> do setRcvQueueNtfCreds db connId Nothing getPrimaryRcvQueue db connId - forM_ rq_ $ \rq -> disableQueueNotifications c rq + mapM_ (disableQueueNotifications c) rq_ withStore' c $ \db -> deleteNtfSubscription db connId rescheduleAction :: AgentMonad m => TMVar () -> UTCTime -> UTCTime -> m Bool @@ -346,7 +357,7 @@ closeNtfSupervisor ns = do cancelNtfWorkers_ :: TMap (ProtocolServer s) (TMVar (), Async ()) -> IO () cancelNtfWorkers_ wsVar = do ws <- atomically $ stateTVar wsVar (,M.empty) - forM_ ws $ uninterruptibleCancel . snd + mapM_ (uninterruptibleCancel . snd) ws getNtfServer :: AgentMonad m => AgentClient -> m (Maybe NtfServer) getNtfServer c = do diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 727ae8403..ebd118a6b 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -345,20 +345,18 @@ aCommandTag = \case ERR _ -> ERR_ SUSPENDED -> SUSPENDED_ -data SwitchPhase = SPStarted | SPConfirmed | SPTested | SPCompleted +data SwitchPhase = SPStarted | SPConfirmed | SPCompleted deriving (Eq, Show) instance StrEncoding SwitchPhase where strEncode = \case SPStarted -> "started" SPConfirmed -> "confirmed" - SPTested -> "tested" SPCompleted -> "completed" strP = A.takeTill (== ' ') >>= \case "started" -> pure SPStarted "confirmed" -> pure SPConfirmed - "tested" -> pure SPTested "completed" -> pure SPCompleted _ -> fail "bad SwitchPhase" @@ -546,8 +544,6 @@ data AgentMessageType | AM_QKEY_ | AM_QUSE_ | AM_QTEST_ - | AM_QDEL_ - | AM_QEND_ deriving (Eq, Show) instance Encoding AgentMessageType where @@ -561,8 +557,6 @@ instance Encoding AgentMessageType where AM_QKEY_ -> "QK" AM_QUSE_ -> "QU" AM_QTEST_ -> "QT" - AM_QDEL_ -> "QD" - AM_QEND_ -> "QE" smpP = A.anyChar >>= \case 'C' -> pure AM_CONN_INFO @@ -576,8 +570,6 @@ instance Encoding AgentMessageType where 'K' -> pure AM_QKEY_ 'U' -> pure AM_QUSE_ 'T' -> pure AM_QTEST_ - 'D' -> pure AM_QDEL_ - 'E' -> pure AM_QEND_ _ -> fail "bad AgentMessageType" _ -> fail "bad AgentMessageType" @@ -598,8 +590,6 @@ agentMessageType = \case QKEY _ -> AM_QKEY_ QUSE _ -> AM_QUSE_ QTEST _ -> AM_QTEST_ - QDEL _ -> AM_QDEL_ - QEND _ -> AM_QEND_ data APrivHeader = APrivHeader { -- | sequential ID assigned by the sending agent @@ -622,8 +612,6 @@ data AMsgType | QKEY_ | QUSE_ | QTEST_ - | QDEL_ - | QEND_ deriving (Eq) instance Encoding AMsgType where @@ -635,8 +623,6 @@ instance Encoding AMsgType where QKEY_ -> "QK" QUSE_ -> "QU" QTEST_ -> "QT" - QDEL_ -> "QD" - QEND_ -> "QE" smpP = A.anyChar >>= \case 'H' -> pure HELLO_ @@ -648,8 +634,6 @@ instance Encoding AMsgType where 'K' -> pure QKEY_ 'U' -> pure QUSE_ 'T' -> pure QTEST_ - 'D' -> pure QDEL_ - 'E' -> pure QEND_ _ -> fail "bad AMsgType" _ -> fail "bad AMsgType" @@ -669,12 +653,8 @@ data AMessage QKEY (L.NonEmpty (SMPQueueInfo, SndPublicVerifyKey)) | -- inform that the queues are ready to use (sent by recipient) QUSE (L.NonEmpty (SndQAddr, Bool)) - | -- sent by the sender to test new queues + | -- sent by the sender to test new queues and to complete switching QTEST (L.NonEmpty SndQAddr) - | -- inform that the queues will be deleted (sent recipient once message received via the new queue) - QDEL (L.NonEmpty SndQAddr) - | -- sent by sender to confirm that no more messages will be sent to the queue - QEND (L.NonEmpty SndQAddr) deriving (Show) type SndQAddr = (SMPServer, SMP.SenderId) @@ -688,8 +668,6 @@ instance Encoding AMessage where QKEY qs -> smpEncode (QKEY_, qs) QUSE qs -> smpEncode (QUSE_, qs) QTEST qs -> smpEncode (QTEST_, qs) - QDEL qs -> smpEncode (QDEL_, qs) - QEND qs -> smpEncode (QEND_, qs) smpP = smpP >>= \case @@ -700,8 +678,6 @@ instance Encoding AMessage where QKEY_ -> QKEY <$> smpP QUSE_ -> QUSE <$> smpP QTEST_ -> QTEST <$> smpP - QDEL_ -> QDEL <$> smpP - QEND_ -> QEND <$> smpP instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where strEncode = \case diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index 24fbbcd63..981d2cda8 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -65,10 +65,8 @@ data RcvQueue = RcvQueue status :: QueueStatus, -- | database queue ID (within connection), can be Nothing for old queues dbQueueId :: Int64, - -- | True for a primary queue of the connection + -- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set) primary :: Bool, - -- | True for the next primary queue - nextPrimary :: Bool, -- | database queue ID to replace, Nothing if this queue is not replacing another, `Just Nothing` is used for replacing old queues dbReplaceQueueId :: Maybe Int64, -- | SMP client version @@ -106,10 +104,8 @@ data SndQueue = SndQueue status :: QueueStatus, -- | database queue ID (within connection), can be Nothing for old queues dbQueueId :: Int64, - -- | True for a primary queue of the connection + -- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set) primary :: Bool, - -- | True for the next primary queue - nextPrimary :: Bool, -- | ID of the queue this one is replacing dbReplaceQueueId :: Maybe Int64, -- | SMP client version @@ -138,7 +134,11 @@ findQ = find . sameQueue {-# INLINE findQ #-} removeQ :: SMPQueue q => (SMPServer, SMP.QueueId) -> NonEmpty q -> Maybe (q, [q]) -removeQ addr qs = case L.break (sameQueue addr) qs of +removeQ = removeQP . sameQueue +{-# INLINE removeQ #-} + +removeQP :: (q -> Bool) -> NonEmpty q -> Maybe (q, [q]) +removeQP p qs = case L.break p qs of (_, []) -> Nothing (qs1, q : qs2) -> Just (q, qs1 <> qs2) @@ -224,7 +224,8 @@ data ConnData = ConnData { connId :: ConnId, connAgentVersion :: Version, enableNtfs :: Bool, - duplexHandshake :: Maybe Bool -- added in agent protocol v2 + duplexHandshake :: Maybe Bool, -- added in agent protocol v2 + deleted :: Bool } deriving (Eq, Show) @@ -272,6 +273,7 @@ data InternalCommand | ICAckDel SMP.RecipientId MsgId InternalId | ICAllowSecure SMP.RecipientId SMP.SndPublicVerifyKey | ICDuplexSecure SMP.RecipientId SMP.SndPublicVerifyKey + | ICDeleteConn | ICQSecure SMP.RecipientId SMP.SndPublicVerifyKey | ICQDelete SMP.RecipientId @@ -280,6 +282,7 @@ data InternalCommandTag | ICAckDel_ | ICAllowSecure_ | ICDuplexSecure_ + | ICDeleteConn_ | ICQSecure_ | ICQDelete_ deriving (Show) @@ -290,6 +293,7 @@ instance StrEncoding InternalCommand where ICAckDel rId srvMsgId mId -> strEncode (ICAckDel_, rId, srvMsgId, mId) ICAllowSecure rId sndKey -> strEncode (ICAllowSecure_, rId, sndKey) ICDuplexSecure rId sndKey -> strEncode (ICDuplexSecure_, rId, sndKey) + ICDeleteConn -> strEncode ICDeleteConn_ ICQSecure rId senderKey -> strEncode (ICQSecure_, rId, senderKey) ICQDelete rId -> strEncode (ICQDelete_, rId) strP = @@ -298,6 +302,7 @@ instance StrEncoding InternalCommand where ICAckDel_ -> ICAckDel <$> _strP <*> _strP <*> _strP ICAllowSecure_ -> ICAllowSecure <$> _strP <*> _strP ICDuplexSecure_ -> ICDuplexSecure <$> _strP <*> _strP + ICDeleteConn_ -> pure ICDeleteConn ICQSecure_ -> ICQSecure <$> _strP <*> _strP ICQDelete_ -> ICQDelete <$> _strP @@ -307,6 +312,7 @@ instance StrEncoding InternalCommandTag where ICAckDel_ -> "ACK_DEL" ICAllowSecure_ -> "ALLOW_SECURE" ICDuplexSecure_ -> "DUPLEX_SECURE" + ICDeleteConn_ -> "DELETE_CONN" ICQSecure_ -> "QSECURE" ICQDelete_ -> "QDELETE" strP = @@ -315,6 +321,7 @@ instance StrEncoding InternalCommandTag where "ACK_DEL" -> pure ICAckDel_ "ALLOW_SECURE" -> pure ICAllowSecure_ "DUPLEX_SECURE" -> pure ICDuplexSecure_ + "DELETE_CONN" -> pure ICDeleteConn_ "QSECURE" -> pure ICQSecure_ "QDELETE" -> pure ICQDelete_ _ -> fail "bad InternalCommandTag" @@ -330,6 +337,7 @@ internalCmdTag = \case ICAckDel {} -> ICAckDel_ ICAllowSecure {} -> ICAllowSecure_ ICDuplexSecure {} -> ICDuplexSecure_ + ICDeleteConn -> ICDeleteConn_ ICQSecure {} -> ICQSecure_ ICQDelete _ -> ICQDelete_ diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 20ec06ede..135e1b43c 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -34,7 +34,9 @@ module Simplex.Messaging.Agent.Store.SQLite createRcvConn, createSndConn, getConn, + getAnyConn, getConnData, + setConnDeleted, getRcvConn, deleteConn, upgradeRcvConnToDuplex, @@ -99,8 +101,8 @@ module Simplex.Messaging.Agent.Store.SQLite -- Notification subscription persistence getNtfSubscription, createNtfSubscription, - supervisorUpdateNtfSubscription, - supervisorUpdateNtfSubAction, + supervisorUpdateNtfSub, + supervisorUpdateNtfAction, updateNtfSubscription, setNullNtfSubscriptionAction, deleteNtfSubscription, @@ -343,19 +345,12 @@ createSndConn db gVar cData@ConnData {connAgentVersion, enableNtfs, duplexHandsh DB.execute db "INSERT INTO connections (conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake) VALUES (?, ?, ?, ?, ?)" (connId, SCMInvitation, connAgentVersion, enableNtfs, duplexHandshake) void $ insertSndQueue_ db connId q -getRcvConn :: DB.Connection -> SMPServer -> SMP.RecipientId -> IO (Either StoreError SomeConn) -getRcvConn db ProtocolServer {host, port} rcvId = - DB.queryNamed - db - [sql| - SELECT q.conn_id - FROM rcv_queues q - WHERE q.host = :host AND q.port = :port AND q.rcv_id = :rcv_id; - |] - [":host" := host, ":port" := port, ":rcv_id" := rcvId] - >>= \case - [Only connId] -> getConn db connId - _ -> pure $ Left SEConnNotFound +getRcvConn :: DB.Connection -> SMPServer -> SMP.RecipientId -> IO (Either StoreError (RcvQueue, SomeConn)) +getRcvConn db ProtocolServer {host, port} rcvId = runExceptT $ do + rq@RcvQueue {connId} <- + ExceptT . firstRow toRcvQueue SEConnNotFound $ + DB.query db (rcvQueueQuery <> " WHERE q.host = ? AND q.port = ? AND q.rcv_id = ?") (host, port, rcvId) + (rq,) <$> ExceptT (getConn db connId) deleteConn :: DB.Connection -> ConnId -> IO () deleteConn db connId = @@ -446,19 +441,19 @@ setSndQueueStatus db SndQueue {sndId, server = ProtocolServer {host, port}} stat setRcvQueuePrimary :: DB.Connection -> ConnId -> RcvQueue -> IO () setRcvQueuePrimary db connId RcvQueue {dbQueueId} = do - DB.execute db "UPDATE rcv_queues SET rcv_primary = ?, next_rcv_primary = ? WHERE conn_id = ?" (False, False, connId) + DB.execute db "UPDATE rcv_queues SET rcv_primary = ? WHERE conn_id = ?" (False, connId) DB.execute db - "UPDATE rcv_queues SET rcv_primary = ?, next_rcv_primary = ?, replace_rcv_queue_id = ? WHERE conn_id = ? AND rcv_queue_id = ?" - (True, False, Nothing :: Maybe Int64, connId, dbQueueId) + "UPDATE rcv_queues SET rcv_primary = ?, replace_rcv_queue_id = ? WHERE conn_id = ? AND rcv_queue_id = ?" + (True, Nothing :: Maybe Int64, connId, dbQueueId) setSndQueuePrimary :: DB.Connection -> ConnId -> SndQueue -> IO () setSndQueuePrimary db connId SndQueue {dbQueueId} = do - DB.execute db "UPDATE snd_queues SET snd_primary = ?, next_snd_primary = ? WHERE conn_id = ?" (False, False, connId) + DB.execute db "UPDATE snd_queues SET snd_primary = ? WHERE conn_id = ?" (False, connId) DB.execute db - "UPDATE snd_queues SET snd_primary = ?, next_snd_primary = ?, replace_snd_queue_id = ? WHERE conn_id = ? AND snd_queue_id = ?" - (True, False, Nothing :: Maybe Int64, connId, dbQueueId) + "UPDATE snd_queues SET snd_primary = ?, replace_snd_queue_id = ? WHERE conn_id = ? AND snd_queue_id = ?" + (True, Nothing :: Maybe Int64, connId, dbQueueId) deleteConnRcvQueue :: DB.Connection -> ConnId -> RcvQueue -> IO () deleteConnRcvQueue db connId RcvQueue {dbQueueId} = @@ -475,7 +470,7 @@ getPrimaryRcvQueue db connId = getRcvQueue :: DB.Connection -> ConnId -> SMPServer -> SMP.RecipientId -> IO (Either StoreError RcvQueue) getRcvQueue db connId (SMPServer host port _) rcvId = - firstRow (toRcvQueue connId) SEConnNotFound $ + firstRow toRcvQueue SEConnNotFound $ DB.query db (rcvQueueQuery <> "WHERE q.conn_id = ? AND q.host = ? AND q.port = ? AND q.rcv_id = ?") (connId, host, port, rcvId) setRcvQueueNtfCreds :: DB.Connection -> ConnId -> Maybe ClientNtfCreds -> IO () @@ -949,9 +944,10 @@ getNtfSubscription db connId = _ -> Nothing in (NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}, action) -createNtfSubscription :: DB.Connection -> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO () -createNtfSubscription db ntfSubscription action actionTs = do +createNtfSubscription :: DB.Connection -> NtfSubscription -> NtfSubAction -> IO () +createNtfSubscription db ntfSubscription action = do let NtfSubscription {connId, smpServer = (SMPServer host port _), ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} = ntfSubscription + actionTs <- liftIO getCurrentTime DB.execute db [sql| @@ -966,9 +962,9 @@ createNtfSubscription db ntfSubscription action actionTs = do where (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action -supervisorUpdateNtfSubscription :: DB.Connection -> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO () -supervisorUpdateNtfSubscription db NtfSubscription {connId, ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} action actionTs = do - updatedAt <- getCurrentTime +supervisorUpdateNtfSub :: DB.Connection -> NtfSubscription -> NtfSubAction -> IO () +supervisorUpdateNtfSub db NtfSubscription {connId, ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} action = do + ts <- getCurrentTime DB.execute db [sql| @@ -976,13 +972,13 @@ supervisorUpdateNtfSubscription db NtfSubscription {connId, ntfQueueId, ntfServe SET smp_ntf_id = ?, ntf_host = ?, ntf_port = ?, ntf_sub_id = ?, ntf_sub_status = ?, ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ? WHERE conn_id = ? |] - ((ntfQueueId, ntfHost, ntfPort, ntfSubId) :. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, actionTs, True, updatedAt, connId)) + ((ntfQueueId, ntfHost, ntfPort, ntfSubId) :. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, ts, True, ts, connId)) where (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action -supervisorUpdateNtfSubAction :: DB.Connection -> ConnId -> NtfSubAction -> NtfActionTs -> IO () -supervisorUpdateNtfSubAction db connId action actionTs = do - updatedAt <- getCurrentTime +supervisorUpdateNtfAction :: DB.Connection -> ConnId -> NtfSubAction -> IO () +supervisorUpdateNtfAction db connId action = do + ts <- getCurrentTime DB.execute db [sql| @@ -990,7 +986,7 @@ supervisorUpdateNtfSubAction db connId action actionTs = do SET ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ? WHERE conn_id = ? |] - (ntfSubAction, ntfSubSMPAction, actionTs, True, updatedAt, connId) + (ntfSubAction, ntfSubSMPAction, ts, True, ts, connId) where (ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action @@ -1285,9 +1281,9 @@ insertRcvQueue_ db connId' RcvQueue {..} = do db [sql| INSERT INTO rcv_queues - (host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret, snd_id, status, rcv_queue_id, rcv_primary, next_rcv_primary, replace_rcv_queue_id, smp_client_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?); + (host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret, snd_id, status, rcv_queue_id, rcv_primary, replace_rcv_queue_id, smp_client_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?); |] - ((host server, port server, rcvId, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret) :. (sndId, status, qId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion)) + ((host server, port server, rcvId, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret) :. (sndId, status, qId, primary, dbReplaceQueueId, smpClientVersion)) pure qId -- * createSndConn helpers @@ -1299,9 +1295,9 @@ insertSndQueue_ db connId' SndQueue {..} = do db [sql| INSERT INTO snd_queues - (host, port, snd_id, conn_id, snd_public_key, snd_private_key, e2e_pub_key, e2e_dh_secret, status, snd_queue_id, snd_primary, next_snd_primary, replace_snd_queue_id, smp_client_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?); + (host, port, snd_id, conn_id, snd_public_key, snd_private_key, e2e_pub_key, e2e_dh_secret, status, snd_queue_id, snd_primary, replace_snd_queue_id, smp_client_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?); |] - ((host server, port server, sndId, connId', sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret) :. (status, qId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion)) + ((host server, port server, sndId, connId', sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret) :. (status, qId, primary, dbReplaceQueueId, smpClientVersion)) pure qId newQueueId_ :: [Only Int64] -> Int64 @@ -1311,62 +1307,71 @@ newQueueId_ (Only maxId : _) = maxId + 1 -- * getConn helpers getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn) -getConn dbConn connId = +getConn db connId = getAnyConn db connId False + +getAnyConn :: DB.Connection -> ConnId -> Bool -> IO (Either StoreError SomeConn) +getAnyConn dbConn connId deleted' = getConnData dbConn connId >>= \case Nothing -> pure $ Left SEConnNotFound - Just (cData, cMode) -> do - rQ <- getRcvQueuesByConnId_ dbConn connId - sQ <- getSndQueuesByConnId_ dbConn connId - pure $ case (rQ, sQ, cMode) of - (Just rqs, Just sqs, CMInvitation) -> Right $ SomeConn SCDuplex (DuplexConnection cData rqs sqs) - (Just (rq :| _), Nothing, CMInvitation) -> Right $ SomeConn SCRcv (RcvConnection cData rq) - (Nothing, Just (sq :| _), CMInvitation) -> Right $ SomeConn SCSnd (SndConnection cData sq) - (Just (rq :| _), Nothing, CMContact) -> Right $ SomeConn SCContact (ContactConnection cData rq) - (Nothing, Nothing, _) -> Right $ SomeConn SCNew (NewConnection cData) - _ -> Left SEConnNotFound + Just (cData@ConnData {deleted}, cMode) + | deleted /= deleted' -> pure $ Left SEConnNotFound + | otherwise -> do + rQ <- getRcvQueuesByConnId_ dbConn connId + sQ <- getSndQueuesByConnId_ dbConn connId + pure $ case (rQ, sQ, cMode) of + (Just rqs, Just sqs, CMInvitation) -> Right $ SomeConn SCDuplex (DuplexConnection cData rqs sqs) + (Just (rq :| _), Nothing, CMInvitation) -> Right $ SomeConn SCRcv (RcvConnection cData rq) + (Nothing, Just (sq :| _), CMInvitation) -> Right $ SomeConn SCSnd (SndConnection cData sq) + (Just (rq :| _), Nothing, CMContact) -> Right $ SomeConn SCContact (ContactConnection cData rq) + (Nothing, Nothing, _) -> Right $ SomeConn SCNew (NewConnection cData) + _ -> Left SEConnNotFound getConnData :: DB.Connection -> ConnId -> IO (Maybe (ConnData, ConnectionMode)) getConnData dbConn connId' = - maybeFirstRow cData $ DB.query dbConn "SELECT conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake FROM connections WHERE conn_id = ?;" (Only connId') + maybeFirstRow cData $ DB.query dbConn "SELECT conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake, deleted FROM connections WHERE conn_id = ?;" (Only connId') where - cData (connId, cMode, connAgentVersion, enableNtfs_, duplexHandshake) = (ConnData {connId, connAgentVersion, enableNtfs = fromMaybe True enableNtfs_, duplexHandshake}, cMode) + cData (connId, cMode, connAgentVersion, enableNtfs_, duplexHandshake, deleted) = (ConnData {connId, connAgentVersion, enableNtfs = fromMaybe True enableNtfs_, duplexHandshake, deleted}, cMode) + +setConnDeleted :: DB.Connection -> ConnId -> IO () +setConnDeleted db connId = DB.execute db "UPDATE connections SET deleted = ? WHERE conn_id = ?" (True, connId) -- | returns all connection queues, the first queue is the primary one getRcvQueuesByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueue)) getRcvQueuesByConnId_ db connId = - L.nonEmpty . sortBy primaryFirst . map (toRcvQueue connId) + L.nonEmpty . sortBy primaryFirst . map toRcvQueue <$> DB.query db (rcvQueueQuery <> "WHERE q.conn_id = ?") (Only connId) where - primaryFirst RcvQueue {primary = p} RcvQueue {primary = p'} = compare (Down p) (Down p') + primaryFirst RcvQueue {primary = p, dbReplaceQueueId = i} RcvQueue {primary = p', dbReplaceQueueId = i'} = + -- the current primary queue is ordered first, the next primary - second + compare (Down p) (Down p') <> compare i i' rcvQueueQuery :: Query rcvQueueQuery = [sql| - SELECT s.key_hash, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, + SELECT s.key_hash, q.conn_id, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret, q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.status, - q.rcv_queue_id, q.rcv_primary, q.next_rcv_primary, q.replace_rcv_queue_id, q.smp_client_version, + q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.smp_client_version, q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret FROM rcv_queues q INNER JOIN servers s ON q.host = s.host AND q.port = s.port |] toRcvQueue :: - ConnId -> - (C.KeyHash, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateSignKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, QueueStatus) - :. (Int64, Bool, Bool, Maybe Int64, Maybe Version) + (C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateSignKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, QueueStatus) + :. (Int64, Bool, Maybe Int64, Maybe Version) :. (Maybe SMP.NtfPublicVerifyKey, Maybe SMP.NtfPrivateSignKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret) -> RcvQueue -toRcvQueue connId ((keyHash, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (dbQueueId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion_) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) = +toRcvQueue ((keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (dbQueueId, primary, dbReplaceQueueId, smpClientVersion_) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) = let server = SMPServer host port keyHash smpClientVersion = fromMaybe 1 smpClientVersion_ clientNtfCreds = case (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) of (Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret) -> Just $ ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} _ -> Nothing - in RcvQueue {connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbQueueId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion, clientNtfCreds} + in RcvQueue {connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbQueueId, primary, dbReplaceQueueId, smpClientVersion, clientNtfCreds} getRcvQueueById_ :: DB.Connection -> ConnId -> Int64 -> IO (Either StoreError RcvQueue) getRcvQueueById_ db connId dbRcvId = - firstRow (toRcvQueue connId) SEConnNotFound $ + firstRow toRcvQueue SEConnNotFound $ DB.query db (rcvQueueQuery <> " WHERE conn_id = ? AND rcv_queue_id = ?") (connId, dbRcvId) -- | returns all connection queues, the first queue is the primary one @@ -1376,17 +1381,19 @@ getSndQueuesByConnId_ dbConn connId = <$> DB.query dbConn [sql| - SELECT s.key_hash, q.host, q.port, q.snd_id, q.snd_public_key, q.snd_private_key, q.e2e_pub_key, q.e2e_dh_secret, q.status, q.snd_queue_id, q.snd_primary, q.next_snd_primary, q.replace_snd_queue_id, q.smp_client_version + SELECT s.key_hash, q.host, q.port, q.snd_id, q.snd_public_key, q.snd_private_key, q.e2e_pub_key, q.e2e_dh_secret, q.status, q.snd_queue_id, q.snd_primary, q.replace_snd_queue_id, q.smp_client_version FROM snd_queues q INNER JOIN servers s ON q.host = s.host AND q.port = s.port WHERE q.conn_id = ?; |] (Only connId) where - sndQueue ((keyHash, host, port, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status) :. (dbQueueId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion)) = + sndQueue ((keyHash, host, port, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status) :. (dbQueueId, primary, dbReplaceQueueId, smpClientVersion)) = let server = SMPServer host port keyHash - in SndQueue {connId, server, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, dbQueueId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion} - primaryFirst SndQueue {primary = p} SndQueue {primary = p'} = compare (Down p) (Down p') + in SndQueue {connId, server, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, dbQueueId, primary, dbReplaceQueueId, smpClientVersion} + primaryFirst SndQueue {primary = p, dbReplaceQueueId = i} SndQueue {primary = p', dbReplaceQueueId = i'} = + -- the current primary queue is ordered first, the next primary - second + compare (Down p) (Down p') <> compare i i' -- * updateRcvIds helpers diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220915_connection_queues.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220915_connection_queues.hs index f50295873..16d9d3692 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220915_connection_queues.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220915_connection_queues.hs @@ -18,9 +18,6 @@ CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues (conn_id, rcv_queue_id); ALTER TABLE rcv_queues ADD COLUMN rcv_primary INTEGER CHECK (rcv_primary NOT NULL); UPDATE rcv_queues SET rcv_primary = 1; -ALTER TABLE rcv_queues ADD COLUMN next_rcv_primary INTEGER CHECK (next_rcv_primary NOT NULL); -UPDATE rcv_queues SET next_rcv_primary = 0; - ALTER TABLE rcv_queues ADD COLUMN replace_rcv_queue_id INTEGER NULL; -- snd_queues @@ -31,11 +28,12 @@ CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues (conn_id, snd_queue_id); ALTER TABLE snd_queues ADD COLUMN snd_primary INTEGER CHECK (snd_primary NOT NULL); UPDATE snd_queues SET snd_primary = 1; -ALTER TABLE snd_queues ADD COLUMN next_snd_primary INTEGER CHECK (next_snd_primary NOT NULL); -UPDATE snd_queues SET next_snd_primary = 0; - ALTER TABLE snd_queues ADD COLUMN replace_snd_queue_id INTEGER NULL; +-- connections +ALTER TABLE connections ADD COLUMN deleted INTEGER DEFAULT 0 CHECK (deleted NOT NULL); +UPDATE connections SET deleted = 0; + -- messages CREATE TABLE snd_message_deliveries ( snd_message_delivery_id INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index 1b833e7e1..693f8a73f 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -21,7 +21,8 @@ CREATE TABLE connections( smp_agent_version INTEGER NOT NULL DEFAULT 1 , duplex_handshake INTEGER NULL DEFAULT 0, - enable_ntfs INTEGER + enable_ntfs INTEGER, + deleted INTEGER DEFAULT 0 CHECK(deleted NOT NULL) ) WITHOUT ROWID; CREATE TABLE rcv_queues( host TEXT NOT NULL, @@ -43,7 +44,6 @@ CREATE TABLE rcv_queues( rcv_ntf_dh_secret BLOB, rcv_queue_id INTEGER CHECK(rcv_queue_id NOT NULL), rcv_primary INTEGER CHECK(rcv_primary NOT NULL), - next_rcv_primary INTEGER CHECK(next_rcv_primary NOT NULL), replace_rcv_queue_id INTEGER NULL, PRIMARY KEY(host, port, rcv_id), FOREIGN KEY(host, port) REFERENCES servers @@ -64,7 +64,6 @@ CREATE TABLE snd_queues( e2e_pub_key BLOB, snd_queue_id INTEGER CHECK(snd_queue_id NOT NULL), snd_primary INTEGER CHECK(snd_primary NOT NULL), - next_snd_primary INTEGER CHECK(next_snd_primary NOT NULL), replace_snd_queue_id INTEGER NULL, PRIMARY KEY(host, port, snd_id), FOREIGN KEY(host, port) REFERENCES servers diff --git a/src/Simplex/Messaging/Notifications/Types.hs b/src/Simplex/Messaging/Notifications/Types.hs index e8827b911..34a0079bb 100644 --- a/src/Simplex/Messaging/Notifications/Types.hs +++ b/src/Simplex/Messaging/Notifications/Types.hs @@ -87,6 +87,7 @@ isDeleteNtfSubAction = \case NSACreate -> False NSACheck -> False NSADelete -> True + NSARotate -> True NtfSubSMPAction a -> case a of NSASmpKey -> False NSASmpDelete -> True @@ -97,6 +98,7 @@ data NtfSubNTFAction = NSACreate | NSACheck | NSADelete + | NSARotate deriving (Show) instance Encoding NtfSubNTFAction where @@ -104,11 +106,13 @@ instance Encoding NtfSubNTFAction where NSACreate -> "N" NSACheck -> "C" NSADelete -> "D" + NSARotate -> "R" smpP = A.anyChar >>= \case 'N' -> pure NSACreate 'C' -> pure NSACheck 'D' -> pure NSADelete + 'R' -> pure NSARotate _ -> fail "bad NtfSubNTFAction" instance FromField NtfSubNTFAction where fromField = blobFieldDecoder smpDecode diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index b3a32985e..6fa506204 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -5,11 +5,15 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE RankNTypes #-} {-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-} module AgentTests.FunctionalAPITests ( functionalAPITests, + testServerMatrix2, makeConnection, + exchangeGreetingsMsgId, + switchComplete, get, (##>), (=##>), @@ -117,14 +121,12 @@ functionalAPITests t = do it "should accept connection using async command" $ withSmpServer t testAcceptContactAsync describe "Queue rotation" $ do - it "should switch delivery to the new queue (1 server)" $ - withSmpServer t $ testSwitchConnection initAgentServers - it "should switch delivery to the new queue (2 servers)" $ - withSmpServer t . withSmpServerOn t testPort2 $ testSwitchConnection initAgentServers2 - it "should switch to new queue asynchronously (1 server)" $ - withSmpServer t $ testSwitchAsync initAgentServers - it "should switch to new queue asynchronously (2 servers)" $ - withSmpServer t . withSmpServerOn t testPort2 $ testSwitchAsync initAgentServers2 + describe "should switch delivery to the new queue" $ + testServerMatrix2 t testSwitchConnection + describe "should switch to new queue asynchronously" $ + testServerMatrix2 t testSwitchAsync + describe "should delete connection during rotation" $ + testServerMatrix2 t testSwitchDelete testMatrix2 :: ATransport -> (AgentClient -> AgentClient -> AgentMsgId -> IO ()) -> Spec testMatrix2 t runTest = do @@ -140,6 +142,11 @@ testRatchetMatrix2 t runTest = do it "ratchets v1 to v2" $ withSmpServer t $ runTestCfg2 agentCfgRatchetV1 agentCfg 3 runTest it "ratchets v2 to v1" $ withSmpServer t $ runTestCfg2 agentCfg agentCfgRatchetV1 3 runTest +testServerMatrix2 :: ATransport -> (InitialAgentServers -> IO ()) -> Spec +testServerMatrix2 t runTest = do + it "1 server" $ withSmpServer t $ runTest initAgentServers + it "2 servers" $ withSmpServer t . withSmpServerOn t testPort2 $ runTest initAgentServers2 + runTestCfg2 :: AgentConfig -> AgentConfig -> AgentMsgId -> (AgentClient -> AgentClient -> AgentMsgId -> IO ()) -> IO () runTestCfg2 aliceCfg bobCfg baseMsgId runTest = do alice <- getSMPAgentClient aliceCfg initAgentServers @@ -651,22 +658,24 @@ testAcceptContactAsync = do testSwitchConnection :: InitialAgentServers -> IO () testSwitchConnection servers = do a <- getSMPAgentClient agentCfg servers - b <- getSMPAgentClient agentCfg {database = testDB2} servers + b <- getSMPAgentClient agentCfg {database = testDB2, initialClientId = 1} servers Right () <- runExceptT $ do (aId, bId) <- makeConnection a b exchangeGreetingsMsgId 4 a bId b aId switchConnectionAsync a "" bId - phase a bId SPStarted - phase b aId SPStarted - phase a bId SPConfirmed - phase b aId SPConfirmed - phase a bId SPTested - phase b aId SPTested - phase b aId SPCompleted - phase a bId SPCompleted - exchangeGreetingsMsgId 12 a bId b aId + switchComplete a bId b aId + exchangeGreetingsMsgId 10 a bId b aId pure () +switchComplete :: AgentClient -> ByteString -> AgentClient -> ByteString -> ExceptT AgentErrorType IO () +switchComplete a bId b aId = do + phase a bId SPStarted + phase b aId SPStarted + phase a bId SPConfirmed + phase b aId SPConfirmed + phase b aId SPCompleted + phase a bId SPCompleted + phase :: AgentClient -> ByteString -> SwitchPhase -> ExceptT AgentErrorType IO () phase c connId p = get c >>= \(_, connId', msg) -> do @@ -685,39 +694,51 @@ testSwitchAsync servers = do (aId, bId) <- makeConnection a b exchangeGreetingsMsgId 4 a bId b aId pure (aId, bId) - let phaseA = withA . phase' bId - phaseB = withB . phase' aId - Right () <- withA $ \a -> runExceptT $ do - subscribeConnection a bId + let withA' = session withA bId + withB' = session withB aId + withA' $ \a -> do switchConnectionAsync a "" bId phase a bId SPStarted - liftIO $ threadDelay 500000 - phaseB SPStarted - phaseA SPConfirmed - phaseB SPConfirmed - phaseA SPTested - Right () <- withB $ \b -> runExceptT $ do - subscribeConnection b aId - phase b aId SPTested + withB' $ \b -> phase b aId SPStarted + withA' $ \a -> phase a bId SPConfirmed + withB' $ \b -> do + phase b aId SPConfirmed phase b aId SPCompleted - phaseA SPCompleted + withA' $ \a -> phase a bId SPCompleted Right () <- withA $ \a -> withB $ \b -> runExceptT $ do subscribeConnection a bId subscribeConnection b aId - exchangeGreetingsMsgId 12 a bId b aId + exchangeGreetingsMsgId 10 a bId b aId pure () where withAgent :: AgentConfig -> (AgentClient -> IO a) -> IO a withAgent cfg' = bracket (getSMPAgentClient cfg' servers) disconnectAgentClient - phase' connId p c = do - Right () <- runExceptT $ do + session :: (forall a. (AgentClient -> IO a) -> IO a) -> ConnId -> (AgentClient -> ExceptT AgentErrorType IO ()) -> IO () + session withC connId a = do + Right () <- withC $ \c -> runExceptT $ do subscribeConnection c connId - phase c connId p + r <- a c liftIO $ threadDelay 500000 + pure r pure () withA = withAgent agentCfg withB = withAgent agentCfg {database = testDB2, initialClientId = 1} +testSwitchDelete :: InitialAgentServers -> IO () +testSwitchDelete servers = do + a <- getSMPAgentClient agentCfg servers + b <- getSMPAgentClient agentCfg {database = testDB2, initialClientId = 1} servers + Right () <- runExceptT $ do + (aId, bId) <- makeConnection a b + exchangeGreetingsMsgId 4 a bId b aId + disconnectAgentClient b + switchConnectionAsync a "" bId + phase a bId SPStarted + deleteConnectionAsync a "1" bId + ("1", bId', OK) <- get a + liftIO $ bId `shouldBe` bId' + pure () + exchangeGreetings :: AgentClient -> ConnId -> AgentClient -> ConnId -> ExceptT AgentErrorType IO () exchangeGreetings = exchangeGreetingsMsgId 4 diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index f9213c252..8d7e358fc 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -8,8 +8,7 @@ module AgentTests.NotificationTests where -- import Control.Logger.Simple (LogConfig (..), LogLevel (..), setLogLevel, withGlobalLogging) - -import AgentTests.FunctionalAPITests (get, makeConnection, (##>), (=##>), pattern Msg) +import AgentTests.FunctionalAPITests (exchangeGreetingsMsgId, get, makeConnection, switchComplete, testServerMatrix2, (##>), (=##>), pattern Msg) import Control.Concurrent (killThread, threadDelay) import Control.Monad.Except import qualified Data.Aeson as J @@ -22,7 +21,7 @@ import NtfClient import SMPAgentClient (agentCfg, initAgentServers, testDB, testDB2) import SMPClient (testPort, withSmpServer, withSmpServerStoreLogOn) import Simplex.Messaging.Agent -import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), databaseFile) +import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers, databaseFile) import Simplex.Messaging.Agent.Protocol import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String @@ -84,6 +83,10 @@ notificationTests t = it "should resume subscriptions after SMP server is restarted" $ \_ -> withAPNSMockServer $ \apns -> withNtfServer t $ testNotificationsSMPRestart t apns + describe "should switch notifications to the new queue" $ + testServerMatrix2 t $ \servers -> + withAPNSMockServer $ \apns -> + withNtfServer t $ testSwitchNotifications servers apns testNotificationToken :: APNSMockServer -> IO () testNotificationToken APNSMockServer {apnsQ} = do @@ -485,6 +488,28 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = do liftIO $ killThread threadId pure () +testSwitchNotifications :: InitialAgentServers -> APNSMockServer -> IO () +testSwitchNotifications servers APNSMockServer {apnsQ} = do + a <- getSMPAgentClient agentCfg servers + b <- getSMPAgentClient agentCfg {database = testDB2, initialClientId = 1} servers + Right () <- runExceptT $ do + (aId, bId) <- makeConnection a b + exchangeGreetingsMsgId 4 a bId b aId + _ <- registerTestToken a "abcd" NMInstant apnsQ + liftIO $ threadDelay 250000 + let testMessage msg = do + msgId <- sendMessage b aId (SMP.MsgFlags True) msg + get b ##> ("", aId, SENT msgId) + void $ messageNotification apnsQ + get a =##> \case ("", c, Msg msg') -> c == bId && msg == msg'; _ -> False + ackMessage a bId msgId + testMessage "hello" + switchConnectionAsync a "" bId + switchComplete a bId b aId + liftIO $ threadDelay 500000 + testMessage "hello again" + pure () + messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString) messageNotification apnsQ = do 1000000 `timeout` atomically (readTBQueue apnsQ) >>= \case diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 80baeeb69..072021461 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -140,7 +140,7 @@ testForeignKeysEnabled = `shouldThrow` (\e -> DB.sqlError e == DB.ErrorConstraint) cData1 :: ConnData -cData1 = ConnData {connId = "conn1", connAgentVersion = 1, enableNtfs = True, duplexHandshake = Nothing} +cData1 = ConnData {connId = "conn1", connAgentVersion = 1, enableNtfs = True, duplexHandshake = Nothing, deleted = False} testPrivateSignKey :: C.APrivateSignKey testPrivateSignKey = C.APrivateSignKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe" @@ -165,7 +165,6 @@ rcvQueue1 = status = New, dbQueueId = 1, primary = True, - nextPrimary = False, dbReplaceQueueId = Nothing, smpClientVersion = 1, clientNtfCreds = Nothing @@ -184,7 +183,6 @@ sndQueue1 = status = New, dbQueueId = 1, primary = True, - nextPrimary = False, dbReplaceQueueId = Nothing, smpClientVersion = 1 } @@ -222,7 +220,7 @@ testCreateRcvConnDuplicate = g <- newTVarIO =<< drgNew _ <- createRcvConn db g cData1 rcvQueue1 SCMInvitation createRcvConn db g cData1 rcvQueue1 SCMInvitation - `shouldReturn` Left (SEConnDuplicate) + `shouldReturn` Left SEConnDuplicate testCreateSndConn :: SpecWith SQLiteStore testCreateSndConn = @@ -257,7 +255,7 @@ testCreateSndConnDuplicate = g <- newTVarIO =<< drgNew _ <- createSndConn db g cData1 sndQueue1 createSndConn db g cData1 sndQueue1 - `shouldReturn` Left (SEConnDuplicate) + `shouldReturn` Left SEConnDuplicate testGetRcvConn :: SpecWith SQLiteStore testGetRcvConn = @@ -267,7 +265,7 @@ testGetRcvConn = g <- newTVarIO =<< drgNew _ <- createRcvConn db g cData1 rcvQueue1 SCMInvitation getRcvConn db smpServer recipientId - `shouldReturn` Right (SomeConn SCRcv (RcvConnection cData1 rcvQueue1)) + `shouldReturn` Right (rcvQueue1, SomeConn SCRcv (RcvConnection cData1 rcvQueue1)) testDeleteRcvConn :: SpecWith SQLiteStore testDeleteRcvConn = @@ -280,7 +278,7 @@ testDeleteRcvConn = `shouldReturn` () -- TODO check queues are deleted as well getConn db "conn1" - `shouldReturn` Left (SEConnNotFound) + `shouldReturn` Left SEConnNotFound testDeleteSndConn :: SpecWith SQLiteStore testDeleteSndConn = @@ -293,7 +291,7 @@ testDeleteSndConn = `shouldReturn` () -- TODO check queues are deleted as well getConn db "conn1" - `shouldReturn` Left (SEConnNotFound) + `shouldReturn` Left SEConnNotFound testDeleteDuplexConn :: SpecWith SQLiteStore testDeleteDuplexConn = @@ -326,7 +324,6 @@ testUpgradeRcvConnToDuplex = status = New, dbQueueId = 1, primary = True, - nextPrimary = False, dbReplaceQueueId = Nothing, smpClientVersion = 1 } @@ -354,7 +351,6 @@ testUpgradeSndConnToDuplex = status = New, dbQueueId = 1, primary = True, - nextPrimary = False, dbReplaceQueueId = Nothing, smpClientVersion = 1, clientNtfCreds = Nothing