simplify queue rotation protocol (#550)

* simplify queue rotation protocol

* use simplified rotation protocol, update tests

* simplify schema

* delete all connection queues

* refactor

* switch notifications to the new queue, tests

* remove TODO

* refactor

* rfc correction

Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com>

* remove duplicate set active

Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com>
This commit is contained in:
Evgeny Poberezkin
2022-10-31 09:33:28 +00:00
committed by GitHub
parent eb5c1c78cb
commit 8d9816809f
13 changed files with 408 additions and 361 deletions

View File

@@ -24,8 +24,6 @@ Additional agent messages required for the protocol:
QKEY_ -> "QK"
QUSE_ -> "QU"
QTEST_ -> "QT"
QDEL_ -> "QD"
QEND_ -> "QE"
`QADD`: add the new queue address(es), the same format as `REPLY` message, encoded as `QA`.
@@ -33,11 +31,7 @@ Additional agent messages required for the protocol:
`QUSE`: instruct the sender to use the new queue with sender's queue ID as parameter, encoded as `QU`.
`QTEST`: send test message to the new connection, encoded as `QT`. Any other message can be sent if available to continue rotation, the absence of this message is not an error.
`QDEL`: instruct the sender to stop using the previous queue, encoded as `QD`
`QEND`: notify the recipient that no messages will be sent to this queue, encoded as `QE`. The recipient will delete this queue.
`QTEST`: send test message to the new connection, encoded as `QT`. Any other message can be sent if available to continue rotation, the absence of this message is not an error. Once this message is successfully sent the sender will stop using the old queue. Once this message (or any other message in the new queue) is received, the recipient will stop using the old queue and delete it.
### Protocol
@@ -50,13 +44,10 @@ participant R' as Server that hosts the new A's receive queue
A ->> R': create new queue
A ->> S ->> B: QADD (R'): snd address of the new queue(s)
B ->> A(R) ->> A: QKEY (R'): sender's key for the new queue(s) (to avoid the race of SMP confirmation for the initial exchange)
B ->> S(R) ->> A: QKEY (R'): sender's key for the new queue(s) (to avoid the race of SMP confirmation for the initial exchange)
A ->> S(R'): secure new queue
A ->> S ->> B: QUSE (R'): instruction to use new queue(s)
B ->> A(R,R') ->> A: QTEST
B ->> A(R,R') ->> A: send all new messages to both queues
A ->> S ->> B: QDEL (R): instruction to delete the old queue
B ->> A(R') -> A: QEND (R): notification that no messages will be sent to the old queue
B ->> R' ->> A: send all new messages to the new queue only
A ->> S(R): DEL: delete the previous queue
B ->> S(R') ->> A: QTEST
A ->> S(R): DEL: delete the old queue
B ->> S(R') ->> A: send all new messages to the new queue
```

View File

@@ -348,7 +348,7 @@ newConnAsync :: forall m c. (AgentMonad m, ConnectionModeI c) => AgentClient ->
newConnAsync c corrId enableNtfs cMode = do
g <- asks idsDrg
connAgentVersion <- asks $ maxVersion . smpAgentVRange . config
let cData = ConnData {connId = "", connAgentVersion, enableNtfs, duplexHandshake = Nothing} -- connection mode is determined by the accepting agent
let cData = ConnData {connId = "", connAgentVersion, enableNtfs, duplexHandshake = Nothing, deleted = False} -- connection mode is determined by the accepting agent
connId <- withStore c $ \db -> createNewConn db g cData cMode
enqueueCommand c corrId connId Nothing $ AClientCommand $ NEW enableNtfs (ACM cMode)
pure connId
@@ -360,7 +360,7 @@ joinConnAsync c corrId enableNtfs cReqUri@(CRInvitationUri (ConnReqUriData _ age
Just (Compatible connAgentVersion) -> do
g <- asks idsDrg
let duplexHS = connAgentVersion /= 1
cData = ConnData {connId = "", connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS}
cData = ConnData {connId = "", connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS, deleted = False}
connId <- withStore c $ \db -> createNewConn db g cData SCMInvitation
enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) cInfo
pure connId
@@ -402,20 +402,22 @@ ackMessageAsync' c corrId connId msgId = do
enqueueCommand c corrId connId (Just server) . AClientCommand $ ACK msgId
deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ACorrId -> ConnId -> m ()
deleteConnectionAsync' c@AgentClient {subQ} corrId connId =
withStore c (`getConn` connId) >>= \case
-- TODO *** delete all queues
SomeConn _ (DuplexConnection _ (rq :| _) _) -> enqueueDelete rq
SomeConn _ (RcvConnection _ rq) -> enqueueDelete rq
SomeConn _ (ContactConnection _ rq) -> enqueueDelete rq
SomeConn _ (SndConnection _ _) -> withStore' c (`deleteConn` connId) >> notifyDeleted
SomeConn _ (NewConnection _) -> withStore' c (`deleteConn` connId) >> notifyDeleted
deleteConnectionAsync' c@AgentClient {subQ} corrId connId = withConnLock c connId "deleteConnectionAsync" $ do
SomeConn _ conn <- withStore c (`getConn` connId)
case conn of
DuplexConnection _ (rq :| _) _ -> enqueueDelete rq
RcvConnection _ rq -> enqueueDelete rq
ContactConnection _ rq -> enqueueDelete rq
SndConnection _ _ -> delete
NewConnection _ -> delete
where
enqueueDelete :: RcvQueue -> m ()
enqueueDelete RcvQueue {server} =
enqueueCommand c corrId connId (Just server) $ AClientCommand DEL
notifyDeleted :: m ()
notifyDeleted = atomically $ writeTBQueue subQ (corrId, connId, OK)
enqueueDelete RcvQueue {server} = do
withStore' c $ \db -> setConnDeleted db connId
disableConn c connId
enqueueCommand c corrId connId (Just server) $ AInternalCommand ICDeleteConn
delete :: m ()
delete = withStore' c (`deleteConn` connId) >> atomically (writeTBQueue subQ (corrId, connId, OK))
-- | Add connection to the new receive queue
switchConnectionAsync' :: AgentMonad m => AgentClient -> ACorrId -> ConnId -> m ()
@@ -451,7 +453,7 @@ newConnSrv c connId asyncMode enableNtfs cMode srv = do
pure connId
setUpConn False rq connAgentVersion = do
g <- asks idsDrg
let cData = ConnData {connId, connAgentVersion, enableNtfs, duplexHandshake = Nothing} -- connection mode is determined by the accepting agent
let cData = ConnData {connId, connAgentVersion, enableNtfs, duplexHandshake = Nothing, deleted = False} -- connection mode is determined by the accepting agent
withStore c $ \db -> createRcvConn db g cData rq cMode
joinConn :: AgentMonad m => AgentClient -> ConnId -> Bool -> Bool -> ConnectionRequestUri c -> ConnInfo -> m ConnId
@@ -475,7 +477,7 @@ joinConnSrv c connId asyncMode enableNtfs (CRInvitationUri (ConnReqUriData _ age
let rc = CR.initSndRatchet e2eEncryptVRange rcDHRr rcDHRs $ CR.x3dhSnd pk1 pk2 e2eRcvParams
q <- newSndQueue "" qInfo
let duplexHS = connAgentVersion /= 1
cData = ConnData {connId, connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS}
cData = ConnData {connId, connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS, deleted = False}
connId' <- setUpConn asyncMode cData q rc
let sq = (q :: SndQueue) {connId = connId'}
cData' = (cData :: ConnData) {connId = connId'}
@@ -558,23 +560,23 @@ rejectContact' c contactConnId invId =
-- | Subscribe to receive connection messages (SUB command) in Reader monad
subscribeConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
subscribeConnection' c connId =
withStore c (`getConn` connId) >>= \conn -> do
resumeConnCmds c connId
case conn of
SomeConn _ (DuplexConnection cData (rq :| rqs) sqs) -> do
mapM_ (resumeMsgDelivery c cData) sqs
subscribe cData rq
mapM_ (\q -> subscribeQueue c q `catchError` \_ -> pure ()) rqs
SomeConn _ (SndConnection cData sq) -> do
resumeMsgDelivery c cData sq
case status (sq :: SndQueue) of
Confirmed -> pure ()
Active -> throwError $ CONN SIMPLEX
_ -> throwError $ INTERNAL "unexpected queue status"
SomeConn _ (RcvConnection cData rq) -> subscribe cData rq
SomeConn _ (ContactConnection cData rq) -> subscribe cData rq
SomeConn _ (NewConnection _) -> pure ()
subscribeConnection' c connId = do
SomeConn _ conn <- withStore c (`getConn` connId)
resumeConnCmds c connId
case conn of
DuplexConnection cData (rq :| rqs) sqs -> do
mapM_ (resumeMsgDelivery c cData) sqs
subscribe cData rq
mapM_ (\q -> subscribeQueue c q `catchError` \_ -> pure ()) rqs
SndConnection cData sq -> do
resumeMsgDelivery c cData sq
case status (sq :: SndQueue) of
Confirmed -> pure ()
Active -> throwError $ CONN SIMPLEX
_ -> throwError $ INTERNAL "unexpected queue status"
RcvConnection cData rq -> subscribe cData rq
ContactConnection cData rq -> subscribe cData rq
NewConnection _ -> pure ()
where
subscribe :: ConnData -> RcvQueue -> m ()
subscribe ConnData {enableNtfs} rq = do
@@ -603,12 +605,12 @@ subscribeConnections' c connIds = do
pure rs
where
rcvQueueOrResult :: SomeConn -> Either (Either AgentErrorType ()) (NonEmpty RcvQueue)
rcvQueueOrResult = \case
SomeConn _ (DuplexConnection _ rqs _) -> Right rqs
SomeConn _ (SndConnection _ sq) -> Left $ sndSubResult sq
SomeConn _ (RcvConnection _ rq) -> Right [rq]
SomeConn _ (ContactConnection _ rq) -> Right [rq]
SomeConn _ (NewConnection _) -> Left (Right ())
rcvQueueOrResult (SomeConn _ conn) = case conn of
DuplexConnection _ rqs _ -> Right rqs
SndConnection _ sq -> Left $ sndSubResult sq
RcvConnection _ rq -> Right [rq]
ContactConnection _ rq -> Right [rq]
NewConnection _ -> Left (Right ())
sndSubResult :: SndQueue -> Either AgentErrorType ()
sndSubResult sq = case status (sq :: SndQueue) of
Confirmed -> Right ()
@@ -643,9 +645,9 @@ subscribeConnections' c connIds = do
_ -> pure ()
_ -> pure ()
sndQueue :: SomeConn -> Maybe (ConnData, NonEmpty SndQueue)
sndQueue = \case
SomeConn _ (DuplexConnection cData _ sqs) -> Just (cData, sqs)
SomeConn _ (SndConnection cData sq) -> Just (cData, [sq])
sndQueue (SomeConn _ conn) = case conn of
DuplexConnection cData _ sqs -> Just (cData, sqs)
SndConnection cData sq -> Just (cData, [sq])
_ -> Nothing
notifyResultError :: Map ConnId (Either AgentErrorType ()) -> m ()
notifyResultError rs = do
@@ -671,12 +673,13 @@ resubscribeConnections' c connIds = do
getConnectionMessage' :: AgentMonad m => AgentClient -> ConnId -> m (Maybe SMPMsgMeta)
getConnectionMessage' c connId = do
whenM (atomically $ hasActiveSubscription c connId) . throwError $ CMD PROHIBITED
withStore c (`getConn` connId) >>= \case
SomeConn _ (DuplexConnection _ (rq :| _) _) -> getQueueMessage c rq
SomeConn _ (RcvConnection _ rq) -> getQueueMessage c rq
SomeConn _ (ContactConnection _ rq) -> getQueueMessage c rq
SomeConn _ SndConnection {} -> throwError $ CONN SIMPLEX
SomeConn _ NewConnection {} -> throwError $ CMD PROHIBITED
SomeConn _ conn <- withStore c (`getConn` connId)
case conn of
DuplexConnection _ (rq :| _) _ -> getQueueMessage c rq
RcvConnection _ rq -> getQueueMessage c rq
ContactConnection _ rq -> getQueueMessage c rq
SndConnection _ _ -> throwError $ CONN SIMPLEX
NewConnection _ -> throwError $ CMD PROHIBITED
getNotificationMessage' :: forall m. AgentMonad m => AgentClient -> C.CbNonce -> ByteString -> m (NotificationInfo, [SMPMsgMeta])
getNotificationMessage' c nonce encNtfInfo = do
@@ -708,9 +711,10 @@ getNotificationMessage' c nonce encNtfInfo = do
-- | Send message to the connection (SEND command) in Reader monad
sendMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> MsgFlags -> MsgBody -> m AgentMsgId
sendMessage' c connId msgFlags msg = withConnLock c connId "sendMessage" $ do
withStore c (`getConn` connId) >>= \case
SomeConn _ (DuplexConnection cData _ sqs) -> enqueueMsgs cData sqs
SomeConn _ (SndConnection cData sq) -> enqueueMsgs cData [sq]
SomeConn _ conn <- withStore c (`getConn` connId)
case conn of
DuplexConnection cData _ sqs -> enqueueMsgs cData sqs
SndConnection cData sq -> enqueueMsgs cData [sq]
_ -> throwError $ CONN SIMPLEX
where
enqueueMsgs :: ConnData -> NonEmpty SndQueue -> m AgentMsgId
@@ -805,6 +809,23 @@ runCommandProcessing c@AgentClient {subQ} server_ = do
secure rq senderKey
when (duplexHandshake cData == Just True) . void $
enqueueMessage c cData sq SMP.MsgFlags {notification = True} HELLO
ICDeleteConn ->
withServer $ \srv -> tryWithLock "ICDeleteConn" $ do
SomeConn _ conn <- withStore c $ \db -> getAnyConn db connId True
case conn of
DuplexConnection _ (rq :| rqs) _ -> delete srv rq $ case rqs of
[] -> notify OK
RcvQueue {server = srv'} : _ -> enqueue srv'
RcvConnection _ rq -> delete srv rq $ notify OK
ContactConnection _ rq -> delete srv rq $ notify OK
_ -> internalErr "command requires connection with rcv queue"
where
delete :: SMPServer -> RcvQueue -> m () -> m ()
delete srv rq@RcvQueue {server} next
| sameSrvAddr srv server = deleteConnQueue c rq >> next
| otherwise = enqueue server
enqueue :: SMPServer -> m ()
enqueue srv = enqueueCommand c corrId connId (Just srv) $ AInternalCommand ICDeleteConn
ICQSecure rId senderKey ->
withServer $ \srv -> tryWithLock "ICQSecure" . withDuplexConn $ \(DuplexConnection cData rqs sqs) ->
case find (sameQueue (srv, rId)) rqs of
@@ -822,6 +843,9 @@ runCommandProcessing c@AgentClient {subQ} server_ = do
| otherwise -> do
deleteQueue c rq'
withStore' c $ \db -> deleteConnRcvQueue db connId rq'
when (enableNtfs cData) $ do
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (connId, NSCCreate)
let conn' = DuplexConnection cData (rq'' :| rqs') sqs
notify $ SWITCH SPCompleted $ connectionStats conn'
_ -> internalErr "ICQDelete: cannot delete the only queue in connection"
@@ -957,7 +981,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
_ -> sendAgentMessage c sq msgFlags msgBody
case resp of
Left e -> do
let err = if msgType == AM_CONN_INFO then ERR e else MERR mId e
let err = if msgType == AM_A_MSG_ then MERR mId e else ERR e
case e of
SMP SMP.QUOTA -> case msgType of
AM_CONN_INFO -> connError msgId NOT_AVAILABLE
@@ -978,14 +1002,12 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
Just _ -> connError msgId NOT_AVAILABLE
-- party joining connection
_ -> connError msgId NOT_ACCEPTED
AM_REPLY_ -> notifyDel msgId $ ERR e
AM_A_MSG_ -> notifyDel msgId $ MERR mId e
AM_QADD_ -> pure ()
AM_QKEY_ -> pure ()
AM_QUSE_ -> pure ()
AM_QTEST_ -> pure ()
AM_QDEL_ -> pure ()
AM_QEND_ -> pure ()
AM_REPLY_ -> notifyDel msgId err
AM_A_MSG_ -> notifyDel msgId err
AM_QADD_ -> qError msgId "QADD: AUTH"
AM_QKEY_ -> qError msgId "QKEY: AUTH"
AM_QUSE_ -> qError msgId "QUSE: AUTH"
AM_QTEST_ -> qError msgId "QTEST: AUTH"
_
-- for other operations BROKER HOST is treated as a permanent error (e.g., when connecting to the server),
-- the message sending would be retried
@@ -1034,11 +1056,30 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
AM_QADD_ -> pure ()
AM_QKEY_ -> pure ()
AM_QUSE_ -> pure ()
AM_QTEST_ ->
AM_QTEST_ -> do
withStore' c $ \db -> setSndQueueStatus db sq Active
AM_QDEL_ -> pure ()
AM_QEND_ ->
getConnectionServers' c connId >>= notify . SWITCH SPCompleted
SomeConn _ conn <- withStore c (`getConn` connId)
case conn of
DuplexConnection cData' rqs sqs -> do
-- remove old snd queue from connection once QTEST is sent to the new queue
case findQ (qAddress sq) sqs of
-- this is the same queue where this loop delivers messages to but with updated state
Just SndQueue {dbReplaceQueueId = Just replacedId, primary} ->
case removeQP (\SndQueue {dbQueueId} -> dbQueueId == replacedId) sqs of
Nothing -> internalErr msgId "sent QTEST: queue not found in connection"
Just (sq', sq'' : sqs') -> do
-- remove the delivery from the map to stop the thread when the delivery loop is complete
atomically $ TM.delete (qAddress sq') $ smpQueueMsgQueues c
withStore' c $ \db -> do
when primary $ setSndQueuePrimary db connId sq'
deletePendingMsgs db connId sq'
deleteConnSndQueue db connId sq'
let sqs'' = sq'' :| sqs'
conn' = DuplexConnection cData' rqs sqs''
notify . SWITCH SPCompleted $ connectionStats conn'
_ -> internalErr msgId "sent QTEST: there is only one queue in connection"
_ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue"
_ -> internalErr msgId "QTEST sent not in duplex connection"
delMsg msgId
where
delMsg :: InternalId -> m ()
@@ -1048,6 +1089,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
notifyDel :: InternalId -> ACommand 'Agent -> m ()
notifyDel msgId cmd = notify cmd >> delMsg msgId
connError msgId = notifyDel msgId . ERR . CONN
qError msgId = notifyDel msgId . ERR . AGENT . A_QUEUE
internalErr msgId = notifyDel msgId . ERR . INTERNAL
retrySndOp :: AgentMonad m => AgentClient -> m () -> m ()
retrySndOp c loop = do
@@ -1084,7 +1127,7 @@ switchConnection' c connId = withConnLock c connId "switchConnection" $ do
srv <- getNextSMPServer c $ map qServer (L.toList rqs) <> map qServer (L.toList sqs)
srv' <- if srv == server then getNextSMPServer c [server] else pure srv
(q, qUri) <- newRcvQueue c connId srv' clientVRange
let rq' = (q :: RcvQueue) {primary = False, nextPrimary = True, dbReplaceQueueId = Just dbQueueId}
let rq' = (q :: RcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId}
void . withStore c $ \db -> addConnRcvQueue db connId rq'
addSubscription c rq'
void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))]
@@ -1100,30 +1143,39 @@ ackQueueMessage c rq srvMsgId =
-- | Suspend SMP agent connection (OFF command) in Reader monad
suspendConnection' :: AgentMonad m => AgentClient -> ConnId -> m ()
suspendConnection' c connId = withConnLock c connId "suspendConnection" $ do
withStore c (`getConn` connId) >>= \case
SomeConn _ (DuplexConnection _ rqs _) -> mapM_ (suspendQueue c) rqs
SomeConn _ (RcvConnection _ rq) -> suspendQueue c rq
SomeConn _ (ContactConnection _ rq) -> suspendQueue c rq
SomeConn _ (SndConnection _ _) -> throwError $ CONN SIMPLEX
SomeConn _ (NewConnection _) -> throwError $ CMD PROHIBITED
SomeConn _ conn <- withStore c (`getConn` connId)
case conn of
DuplexConnection _ rqs _ -> mapM_ (suspendQueue c) rqs
RcvConnection _ rq -> suspendQueue c rq
ContactConnection _ rq -> suspendQueue c rq
SndConnection _ _ -> throwError $ CONN SIMPLEX
NewConnection _ -> throwError $ CMD PROHIBITED
-- | Delete SMP agent connection (DEL command) in Reader monad
deleteConnection' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
deleteConnection' c connId = withConnLock c connId "deleteConnection" $ do
withStore c (`getConn` connId) >>= \case
SomeConn _ (DuplexConnection _ rqs _) -> mapM_ delete rqs
SomeConn _ (RcvConnection _ rq) -> delete rq
SomeConn _ (ContactConnection _ rq) -> delete rq
SomeConn _ (SndConnection _ _) -> withStore' c (`deleteConn` connId)
SomeConn _ (NewConnection _) -> withStore' c (`deleteConn` connId)
SomeConn _ conn <- withStore c (`getConn` connId)
case conn of
DuplexConnection _ rqs _ -> mapM_ (deleteConnQueue c) rqs >> disableConn c connId >> deleteConn'
RcvConnection _ rq -> delete rq
ContactConnection _ rq -> delete rq
SndConnection _ _ -> deleteConn'
NewConnection _ -> deleteConn'
where
delete :: RcvQueue -> m ()
delete rq = do
deleteQueue c rq
atomically $ removeSubscription c connId
withStore' c (`deleteConn` connId)
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete)
delete rq = deleteConnQueue c rq >> disableConn c connId >> deleteConn'
deleteConn' = withStore' c (`deleteConn` connId)
deleteConnQueue :: AgentMonad m => AgentClient -> RcvQueue -> m ()
deleteConnQueue c rq@RcvQueue {connId} = do
deleteQueue c rq
withStore' c $ \db -> deleteConnRcvQueue db connId rq
disableConn :: AgentMonad m => AgentClient -> ConnId -> m ()
disableConn c connId = do
atomically $ removeSubscription c connId
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete)
getConnectionServers' :: AgentMonad m => AgentClient -> ConnId -> m ConnectionStats
getConnectionServers' c connId = do
@@ -1268,10 +1320,11 @@ getNtfTokenData' c =
-- | Set connection notifications, in Reader monad
toggleConnectionNtfs' :: forall m. AgentMonad m => AgentClient -> ConnId -> Bool -> m ()
toggleConnectionNtfs' c connId enable = do
withStore c (`getConn` connId) >>= \case
SomeConn _ (DuplexConnection cData _ _) -> toggle cData
SomeConn _ (RcvConnection cData _) -> toggle cData
SomeConn _ (ContactConnection cData _) -> toggle cData
SomeConn _ conn <- withStore c (`getConn` connId)
case conn of
DuplexConnection cData _ _ -> toggle cData
RcvConnection cData _ -> toggle cData
ContactConnection cData _ -> toggle cData
_ -> throwError $ CONN SIMPLEX
where
toggle :: ConnData -> m ()
@@ -1409,18 +1462,12 @@ subscriber c@AgentClient {msgQ} = forever $ do
Right _ -> return ()
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission BrokerMsg -> m ()
processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cmd) =
withStore c (\db -> getRcvConn db srv rId) >>= \case
-- TODO *** get queue separately?
SomeConn _ conn@(DuplexConnection cData rqs _) -> case find (sameQueue (srv, rId)) rqs of
Just rq -> processSMP conn cData rq
_ -> atomically $ writeTBQueue subQ ("", "", ERR $ CONN NOT_FOUND)
SomeConn _ conn@(RcvConnection cData rq) -> processSMP conn cData rq
SomeConn _ conn@(ContactConnection cData rq) -> processSMP conn cData rq
_ -> atomically $ writeTBQueue subQ ("", "", ERR $ CONN NOT_FOUND)
processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cmd) = do
(rq, SomeConn _ conn) <- withStore c (\db -> getRcvConn db srv rId)
processSMP rq conn $ connData conn
where
processSMP :: Connection c -> ConnData -> RcvQueue -> m ()
processSMP conn cData@ConnData {connId, duplexHandshake} rq@RcvQueue {e2ePrivKey, e2eDhSecret, status} = withConnLock c connId "processSMP" $
processSMP :: RcvQueue -> Connection c -> ConnData -> m ()
processSMP rq@RcvQueue {e2ePrivKey, e2eDhSecret, status} conn cData@ConnData {connId, duplexHandshake} = withConnLock c connId "processSMP" $
case cmd of
SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} -> handleNotifyAck $ do
SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} <- decryptSMPMessage v rq msg
@@ -1441,17 +1488,15 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
decryptClientMessage e2eDh clientMsg >>= \case
(SMP.PHEmpty, AgentMsgEnvelope _ encAgentMsg) -> do
-- primary queue is set as Active in helloMsg, below is to set additional queues Active
let RcvQueue {primary, nextPrimary, dbReplaceQueueId} = rq
unless primary . withStore' c $ \db -> do
unless (status == Active) $ setRcvQueueStatus db rq Active
when nextPrimary $ setRcvQueuePrimary db connId rq
let RcvQueue {primary, dbReplaceQueueId} = rq
unless (status == Active) . withStore' c $ \db -> setRcvQueueStatus db rq Active
case (conn, dbReplaceQueueId) of
(DuplexConnection _ rqs sqs, Just dbRcvId) ->
case find (\RcvQueue {dbQueueId} -> dbQueueId == dbRcvId) rqs of
Just RcvQueue {server, sndId} -> do
void . enqueueMessages c cData sqs SMP.noMsgFlags $ QDEL [(server, sndId)]
notify . SWITCH SPTested $ connectionStats conn
_ -> throwError $ INTERNAL "replaced RcvQueue not found in connection"
(DuplexConnection _ rqs _, Just replacedId) -> do
when primary . withStore' c $ \db -> setRcvQueuePrimary db connId rq
case find (\RcvQueue {dbQueueId} -> dbQueueId == replacedId) rqs of
Just RcvQueue {server, rcvId} -> do
enqueueCommand c "" connId (Just server) $ AInternalCommand $ ICQDelete rcvId
_ -> notify . ERR . AGENT $ A_QUEUE "replaced RcvQueue not found in connection"
_ -> pure ()
tryError agentClientMsg >>= \case
Right (Just (msgId, msgMeta, aMessage)) -> case aMessage of
@@ -1467,8 +1512,6 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
-- no action needed for QTEST
-- any message in the new queue will mark it active and trigger deletion of the old queue
QTEST _ -> logServer "<--" c srv rId "MSG <QTEST>" >> ackDel msgId
QDEL qs -> qDuplex "QDEL" $ qDelMsg qs
QEND qs -> qDuplex "QEND" $ qEndMsg qs
where
qDuplex :: String -> (Connection 'CDuplex -> m ()) -> m ()
qDuplex name a = case conn of
@@ -1606,8 +1649,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
logServer "<--" c srv rId "MSG <HELLO>"
case status of
Active -> prohibited
_ -> do
withStore' c $ \db -> setRcvQueueStatus db rq Active
_ ->
case conn of
DuplexConnection _ _ (sq@SndQueue {status = sndStatus} :| _)
-- `sndStatus == Active` when HELLO was previously sent, and this is the reply HELLO
@@ -1644,7 +1686,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
(Just _, _) -> qError "QADD: queue address is already used in connection"
(_, Just _replaced@SndQueue {dbQueueId}) -> do
sq_@SndQueue {sndPublicKey, e2ePubKey} <- newSndQueue connId qInfo
let sq' = (sq_ :: SndQueue) {nextPrimary = True, dbReplaceQueueId = Just dbQueueId}
let sq' = (sq_ :: SndQueue) {primary = True, dbReplaceQueueId = Just dbQueueId}
void . withStore c $ \db -> addConnSndQueue db connId sq'
case (sndPublicKey, e2ePubKey) of
(Just sndPubKey, Just dhPublicKey) -> do
@@ -1678,47 +1720,18 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
-- processed by queue sender
-- mark queue as Secured and to start sending messages to it
qUseMsg :: NonEmpty ((SMPServer, SMP.SenderId), Bool) -> Connection 'CDuplex -> m ()
qUseMsg ((addr, primary) :| _) (DuplexConnection _ _ sqs) =
case removeQ addr sqs of
Just (sq', sqs') -> do
-- NOTE: does not yet support the change of the primary status during the rotation
qUseMsg ((addr, _primary) :| _) (DuplexConnection _ _ sqs) =
case findQ addr sqs of
Just sq' -> do
logServer "<--" c srv rId $ "MSG <QUSE> " <> logSecret (snd addr)
withStore' c $ \db -> do
setSndQueueStatus db sq' Secured
when primary $ setSndQueuePrimary db connId sq'
let sq'' = (sq' :: SndQueue) {status = Secured, primary}
void $ enqueueMessages c cData (sq'' :| sqs') SMP.noMsgFlags $ QTEST [addr]
withStore' c $ \db -> setSndQueueStatus db sq' Secured
let sq'' = (sq' :: SndQueue) {status = Secured}
-- sending QTEST to the new queue only, the old one will be removed if sent successfully
void $ enqueueMessages c cData [sq''] SMP.noMsgFlags $ QTEST [addr]
notify . SWITCH SPConfirmed $ connectionStats conn
_ -> qError "QUSE: queue address not found in connection"
-- processed by queue sender
-- remove snd queue from connection and enqueue QEND message
qDelMsg :: NonEmpty (SMPServer, SMP.SenderId) -> Connection 'CDuplex -> m ()
qDelMsg (addr :| _) (DuplexConnection _ rqs sqs) =
case removeQ addr sqs of
Nothing -> logServer "<--" c srv rId "MSG <QDEL>: queue not found (already deleted?)"
Just (sq, sq' : sqs') -> do
logServer "<--" c srv rId $ "MSG <QDEL> " <> logSecret (snd addr)
-- remove the delivery from the map to stop the thread when the delivery loop is complete
atomically $ TM.delete addr $ smpQueueMsgQueues c
withStore' c $ \db -> do
deletePendingMsgs db connId sq
deleteConnSndQueue db connId sq
let sqs'' = sq' :| sqs'
conn' = DuplexConnection cData rqs sqs''
void $ enqueueMessages c cData sqs'' SMP.noMsgFlags $ QEND [addr]
notify . SWITCH SPTested $ connectionStats conn'
_ -> qError "QDEL received to the only queue in connection"
-- received by party initiating switch
-- TODO *** check that the received address matches expectations
qEndMsg :: NonEmpty (SMPServer, SMP.SenderId) -> Connection 'CDuplex -> m ()
qEndMsg (addr@(smpServer, senderId) :| _) (DuplexConnection _ rqs _) =
case findRQ addr rqs of
Just RcvQueue {rcvId} -> do
logServer "<--" c srv rId $ "MSG <QEND> " <> logSecret senderId
enqueueCommand c "" connId (Just smpServer) $ AInternalCommand $ ICQDelete rcvId
_ -> qError "QEND: queue address not found in connection"
qError :: String -> m ()
qError = throwError . AGENT . A_QUEUE
@@ -1835,7 +1848,6 @@ newSndQueue_ a connId (Compatible (SMPQueueInfo smpClientVersion SMPQueueAddress
status = New,
dbQueueId = 0,
primary = True,
nextPrimary = False,
dbReplaceQueueId = Nothing,
smpClientVersion
}

View File

@@ -549,7 +549,6 @@ newRcvQueue_ a c connId srv vRange = do
status = New,
dbQueueId = 0,
primary = True,
nextPrimary = False,
dbReplaceQueueId = Nothing,
smpClientVersion = maxVersion vRange,
clientNtfCreds = Nothing

View File

@@ -82,43 +82,48 @@ processNtfSub c (connId, cmd) = do
case clientNtfCreds of
Just ClientNtfCreds {notifierId} -> do
let newSub = newNtfSubscription connId smpServer (Just notifierId) ntfServer NASKey
ts <- liftIO getCurrentTime
withStore' c $ \db -> createNtfSubscription db newSub (NtfSubNTFAction NSACreate) ts
withStore' c $ \db -> createNtfSubscription db newSub $ NtfSubNTFAction NSACreate
addNtfNTFWorker ntfServer
Nothing -> do
let newSub = newNtfSubscription connId smpServer Nothing ntfServer NASNew
ts <- liftIO getCurrentTime
withStore' c $ \db -> createNtfSubscription db newSub (NtfSubSMPAction NSASmpKey) ts
withStore' c $ \db -> createNtfSubscription db newSub $ NtfSubSMPAction NSASmpKey
addNtfSMPWorker smpServer
(Just (sub@NtfSubscription {ntfSubStatus, ntfServer = subNtfServer}, action_)) -> do
case action_ of
-- action was set to NULL after worker internal error
Nothing -> resetSubscription
Just (action, _)
-- subscription was marked for deletion / is being deleted
| isDeleteNtfSubAction action -> do
if ntfSubStatus == NASNew || ntfSubStatus == NASOff || ntfSubStatus == NASDeleted
then resetSubscription
else withNtfServer c $ \ntfServer -> do
ts <- liftIO getCurrentTime
withStore' c $ \db ->
supervisorUpdateNtfSubscription db sub {ntfServer} (NtfSubNTFAction NSACreate) ts
addNtfNTFWorker ntfServer
| otherwise -> case action of
NtfSubNTFAction _ -> addNtfNTFWorker subNtfServer
NtfSubSMPAction _ -> addNtfSMPWorker smpServer
(Just (sub@NtfSubscription {ntfSubStatus, ntfServer = subNtfServer, smpServer = smpServer', ntfQueueId}, action_)) -> do
case (clientNtfCreds, ntfQueueId) of
(Just ClientNtfCreds {notifierId}, Just ntfQueueId')
| sameSrvAddr smpServer smpServer' && notifierId == ntfQueueId' -> create
| otherwise -> rotate
(Nothing, Nothing) -> create
_ -> rotate
where
create :: m ()
create = case action_ of
-- action was set to NULL after worker internal error
Nothing -> resetSubscription
Just (action, _)
-- subscription was marked for deletion / is being deleted
| isDeleteNtfSubAction action -> do
if ntfSubStatus == NASNew || ntfSubStatus == NASOff || ntfSubStatus == NASDeleted
then resetSubscription
else withNtfServer c $ \ntfServer -> do
withStore' c $ \db -> supervisorUpdateNtfSub db sub {ntfServer} (NtfSubNTFAction NSACreate)
addNtfNTFWorker ntfServer
| otherwise -> case action of
NtfSubNTFAction _ -> addNtfNTFWorker subNtfServer
NtfSubSMPAction _ -> addNtfSMPWorker smpServer
rotate :: m ()
rotate = do
withStore' c $ \db -> supervisorUpdateNtfSub db sub (NtfSubNTFAction NSARotate)
addNtfNTFWorker subNtfServer
resetSubscription :: m ()
resetSubscription =
withNtfServer c $ \ntfServer -> do
ts <- liftIO getCurrentTime
withStore' c $ \db ->
supervisorUpdateNtfSubscription db sub {ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew} (NtfSubSMPAction NSASmpKey) ts
let sub' = sub {ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew}
withStore' c $ \db -> supervisorUpdateNtfSub db sub' (NtfSubSMPAction NSASmpKey)
addNtfSMPWorker smpServer
NSCDelete -> do
sub_ <- withStore' c $ \db -> do
ts <- liftIO getCurrentTime
supervisorUpdateNtfSubAction db connId (NtfSubNTFAction NSADelete) ts
supervisorUpdateNtfAction db connId (NtfSubNTFAction NSADelete)
getNtfSubscription db connId
logInfo $ "processNtfSub, NSCDelete - sub_ = " <> tshow sub_
case sub_ of
@@ -128,14 +133,11 @@ processNtfSub c (connId, cmd) = do
withStore' c (`getPrimaryRcvQueue` connId) >>= \case
Right rq@RcvQueue {server = smpServer} -> do
logInfo $ "processNtfSub, NSCSmpDelete - rq = " <> tshow rq
ts <- liftIO getCurrentTime
withStore' c $ \db -> supervisorUpdateNtfSubAction db connId (NtfSubSMPAction NSASmpDelete) ts
withStore' c $ \db -> supervisorUpdateNtfAction db connId (NtfSubSMPAction NSASmpDelete)
addNtfSMPWorker smpServer
_ -> notifyInternalError c connId "NSCSmpDelete - no rcv queue"
NSCNtfWorker ntfServer ->
addNtfNTFWorker ntfServer
NSCNtfSMPWorker smpServer ->
addNtfSMPWorker smpServer
NSCNtfWorker ntfServer -> addNtfNTFWorker ntfServer
NSCNtfSMPWorker smpServer -> addNtfSMPWorker smpServer
where
addNtfNTFWorker = addWorker ntfWorkers runNtfWorker
addNtfSMPWorker = addWorker ntfSMPWorkers runNtfSMPWorker
@@ -214,16 +216,25 @@ runNtfWorker c srv doWork = do
_ -> workerInternalError c connId "NSACheck - no active token"
NSADelete -> case ntfSubId of
Just nSubId ->
(getNtfToken >>= \tkn -> forM_ tkn $ agentNtfDeleteSubscription c nSubId)
`E.finally` carryOnWithDeletion
Nothing -> carryOnWithDeletion
(getNtfToken >>= mapM_ (agentNtfDeleteSubscription c nSubId))
`E.finally` continueDeletion
_ -> continueDeletion
where
carryOnWithDeletion :: m ()
carryOnWithDeletion = do
withStore' c $ \db ->
updateNtfSubscription db sub {ntfSubId = Nothing, ntfSubStatus = NASOff} (NtfSubSMPAction NSASmpDelete) ts
continueDeletion = do
let sub' = sub {ntfSubId = Nothing, ntfSubStatus = NASOff}
withStore' c $ \db -> updateNtfSubscription db sub' (NtfSubSMPAction NSASmpDelete) ts
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCNtfSMPWorker smpServer)
NSARotate -> case ntfSubId of
Just nSubId ->
(getNtfToken >>= mapM_ (agentNtfDeleteSubscription c nSubId))
`E.finally` deleteCreate
_ -> deleteCreate
where
deleteCreate = do
withStore' c $ \db -> deleteNtfSubscription db connId
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCCreate)
where
updateSubNextCheck ts toStatus = do
checkInterval <- asks $ ntfSubCheckInterval . config
@@ -276,7 +287,7 @@ runNtfSMPWorker c srv doWork = do
rq_ <- withStore' c $ \db -> do
setRcvQueueNtfCreds db connId Nothing
getPrimaryRcvQueue db connId
forM_ rq_ $ \rq -> disableQueueNotifications c rq
mapM_ (disableQueueNotifications c) rq_
withStore' c $ \db -> deleteNtfSubscription db connId
rescheduleAction :: AgentMonad m => TMVar () -> UTCTime -> UTCTime -> m Bool
@@ -346,7 +357,7 @@ closeNtfSupervisor ns = do
cancelNtfWorkers_ :: TMap (ProtocolServer s) (TMVar (), Async ()) -> IO ()
cancelNtfWorkers_ wsVar = do
ws <- atomically $ stateTVar wsVar (,M.empty)
forM_ ws $ uninterruptibleCancel . snd
mapM_ (uninterruptibleCancel . snd) ws
getNtfServer :: AgentMonad m => AgentClient -> m (Maybe NtfServer)
getNtfServer c = do

View File

@@ -345,20 +345,18 @@ aCommandTag = \case
ERR _ -> ERR_
SUSPENDED -> SUSPENDED_
data SwitchPhase = SPStarted | SPConfirmed | SPTested | SPCompleted
data SwitchPhase = SPStarted | SPConfirmed | SPCompleted
deriving (Eq, Show)
instance StrEncoding SwitchPhase where
strEncode = \case
SPStarted -> "started"
SPConfirmed -> "confirmed"
SPTested -> "tested"
SPCompleted -> "completed"
strP =
A.takeTill (== ' ') >>= \case
"started" -> pure SPStarted
"confirmed" -> pure SPConfirmed
"tested" -> pure SPTested
"completed" -> pure SPCompleted
_ -> fail "bad SwitchPhase"
@@ -546,8 +544,6 @@ data AgentMessageType
| AM_QKEY_
| AM_QUSE_
| AM_QTEST_
| AM_QDEL_
| AM_QEND_
deriving (Eq, Show)
instance Encoding AgentMessageType where
@@ -561,8 +557,6 @@ instance Encoding AgentMessageType where
AM_QKEY_ -> "QK"
AM_QUSE_ -> "QU"
AM_QTEST_ -> "QT"
AM_QDEL_ -> "QD"
AM_QEND_ -> "QE"
smpP =
A.anyChar >>= \case
'C' -> pure AM_CONN_INFO
@@ -576,8 +570,6 @@ instance Encoding AgentMessageType where
'K' -> pure AM_QKEY_
'U' -> pure AM_QUSE_
'T' -> pure AM_QTEST_
'D' -> pure AM_QDEL_
'E' -> pure AM_QEND_
_ -> fail "bad AgentMessageType"
_ -> fail "bad AgentMessageType"
@@ -598,8 +590,6 @@ agentMessageType = \case
QKEY _ -> AM_QKEY_
QUSE _ -> AM_QUSE_
QTEST _ -> AM_QTEST_
QDEL _ -> AM_QDEL_
QEND _ -> AM_QEND_
data APrivHeader = APrivHeader
{ -- | sequential ID assigned by the sending agent
@@ -622,8 +612,6 @@ data AMsgType
| QKEY_
| QUSE_
| QTEST_
| QDEL_
| QEND_
deriving (Eq)
instance Encoding AMsgType where
@@ -635,8 +623,6 @@ instance Encoding AMsgType where
QKEY_ -> "QK"
QUSE_ -> "QU"
QTEST_ -> "QT"
QDEL_ -> "QD"
QEND_ -> "QE"
smpP =
A.anyChar >>= \case
'H' -> pure HELLO_
@@ -648,8 +634,6 @@ instance Encoding AMsgType where
'K' -> pure QKEY_
'U' -> pure QUSE_
'T' -> pure QTEST_
'D' -> pure QDEL_
'E' -> pure QEND_
_ -> fail "bad AMsgType"
_ -> fail "bad AMsgType"
@@ -669,12 +653,8 @@ data AMessage
QKEY (L.NonEmpty (SMPQueueInfo, SndPublicVerifyKey))
| -- inform that the queues are ready to use (sent by recipient)
QUSE (L.NonEmpty (SndQAddr, Bool))
| -- sent by the sender to test new queues
| -- sent by the sender to test new queues and to complete switching
QTEST (L.NonEmpty SndQAddr)
| -- inform that the queues will be deleted (sent recipient once message received via the new queue)
QDEL (L.NonEmpty SndQAddr)
| -- sent by sender to confirm that no more messages will be sent to the queue
QEND (L.NonEmpty SndQAddr)
deriving (Show)
type SndQAddr = (SMPServer, SMP.SenderId)
@@ -688,8 +668,6 @@ instance Encoding AMessage where
QKEY qs -> smpEncode (QKEY_, qs)
QUSE qs -> smpEncode (QUSE_, qs)
QTEST qs -> smpEncode (QTEST_, qs)
QDEL qs -> smpEncode (QDEL_, qs)
QEND qs -> smpEncode (QEND_, qs)
smpP =
smpP
>>= \case
@@ -700,8 +678,6 @@ instance Encoding AMessage where
QKEY_ -> QKEY <$> smpP
QUSE_ -> QUSE <$> smpP
QTEST_ -> QTEST <$> smpP
QDEL_ -> QDEL <$> smpP
QEND_ -> QEND <$> smpP
instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where
strEncode = \case

View File

@@ -65,10 +65,8 @@ data RcvQueue = RcvQueue
status :: QueueStatus,
-- | database queue ID (within connection), can be Nothing for old queues
dbQueueId :: Int64,
-- | True for a primary queue of the connection
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
primary :: Bool,
-- | True for the next primary queue
nextPrimary :: Bool,
-- | database queue ID to replace, Nothing if this queue is not replacing another, `Just Nothing` is used for replacing old queues
dbReplaceQueueId :: Maybe Int64,
-- | SMP client version
@@ -106,10 +104,8 @@ data SndQueue = SndQueue
status :: QueueStatus,
-- | database queue ID (within connection), can be Nothing for old queues
dbQueueId :: Int64,
-- | True for a primary queue of the connection
-- | True for a primary or a next primary queue of the connection (next if dbReplaceQueueId is set)
primary :: Bool,
-- | True for the next primary queue
nextPrimary :: Bool,
-- | ID of the queue this one is replacing
dbReplaceQueueId :: Maybe Int64,
-- | SMP client version
@@ -138,7 +134,11 @@ findQ = find . sameQueue
{-# INLINE findQ #-}
removeQ :: SMPQueue q => (SMPServer, SMP.QueueId) -> NonEmpty q -> Maybe (q, [q])
removeQ addr qs = case L.break (sameQueue addr) qs of
removeQ = removeQP . sameQueue
{-# INLINE removeQ #-}
removeQP :: (q -> Bool) -> NonEmpty q -> Maybe (q, [q])
removeQP p qs = case L.break p qs of
(_, []) -> Nothing
(qs1, q : qs2) -> Just (q, qs1 <> qs2)
@@ -224,7 +224,8 @@ data ConnData = ConnData
{ connId :: ConnId,
connAgentVersion :: Version,
enableNtfs :: Bool,
duplexHandshake :: Maybe Bool -- added in agent protocol v2
duplexHandshake :: Maybe Bool, -- added in agent protocol v2
deleted :: Bool
}
deriving (Eq, Show)
@@ -272,6 +273,7 @@ data InternalCommand
| ICAckDel SMP.RecipientId MsgId InternalId
| ICAllowSecure SMP.RecipientId SMP.SndPublicVerifyKey
| ICDuplexSecure SMP.RecipientId SMP.SndPublicVerifyKey
| ICDeleteConn
| ICQSecure SMP.RecipientId SMP.SndPublicVerifyKey
| ICQDelete SMP.RecipientId
@@ -280,6 +282,7 @@ data InternalCommandTag
| ICAckDel_
| ICAllowSecure_
| ICDuplexSecure_
| ICDeleteConn_
| ICQSecure_
| ICQDelete_
deriving (Show)
@@ -290,6 +293,7 @@ instance StrEncoding InternalCommand where
ICAckDel rId srvMsgId mId -> strEncode (ICAckDel_, rId, srvMsgId, mId)
ICAllowSecure rId sndKey -> strEncode (ICAllowSecure_, rId, sndKey)
ICDuplexSecure rId sndKey -> strEncode (ICDuplexSecure_, rId, sndKey)
ICDeleteConn -> strEncode ICDeleteConn_
ICQSecure rId senderKey -> strEncode (ICQSecure_, rId, senderKey)
ICQDelete rId -> strEncode (ICQDelete_, rId)
strP =
@@ -298,6 +302,7 @@ instance StrEncoding InternalCommand where
ICAckDel_ -> ICAckDel <$> _strP <*> _strP <*> _strP
ICAllowSecure_ -> ICAllowSecure <$> _strP <*> _strP
ICDuplexSecure_ -> ICDuplexSecure <$> _strP <*> _strP
ICDeleteConn_ -> pure ICDeleteConn
ICQSecure_ -> ICQSecure <$> _strP <*> _strP
ICQDelete_ -> ICQDelete <$> _strP
@@ -307,6 +312,7 @@ instance StrEncoding InternalCommandTag where
ICAckDel_ -> "ACK_DEL"
ICAllowSecure_ -> "ALLOW_SECURE"
ICDuplexSecure_ -> "DUPLEX_SECURE"
ICDeleteConn_ -> "DELETE_CONN"
ICQSecure_ -> "QSECURE"
ICQDelete_ -> "QDELETE"
strP =
@@ -315,6 +321,7 @@ instance StrEncoding InternalCommandTag where
"ACK_DEL" -> pure ICAckDel_
"ALLOW_SECURE" -> pure ICAllowSecure_
"DUPLEX_SECURE" -> pure ICDuplexSecure_
"DELETE_CONN" -> pure ICDeleteConn_
"QSECURE" -> pure ICQSecure_
"QDELETE" -> pure ICQDelete_
_ -> fail "bad InternalCommandTag"
@@ -330,6 +337,7 @@ internalCmdTag = \case
ICAckDel {} -> ICAckDel_
ICAllowSecure {} -> ICAllowSecure_
ICDuplexSecure {} -> ICDuplexSecure_
ICDeleteConn -> ICDeleteConn_
ICQSecure {} -> ICQSecure_
ICQDelete _ -> ICQDelete_

View File

@@ -34,7 +34,9 @@ module Simplex.Messaging.Agent.Store.SQLite
createRcvConn,
createSndConn,
getConn,
getAnyConn,
getConnData,
setConnDeleted,
getRcvConn,
deleteConn,
upgradeRcvConnToDuplex,
@@ -99,8 +101,8 @@ module Simplex.Messaging.Agent.Store.SQLite
-- Notification subscription persistence
getNtfSubscription,
createNtfSubscription,
supervisorUpdateNtfSubscription,
supervisorUpdateNtfSubAction,
supervisorUpdateNtfSub,
supervisorUpdateNtfAction,
updateNtfSubscription,
setNullNtfSubscriptionAction,
deleteNtfSubscription,
@@ -343,19 +345,12 @@ createSndConn db gVar cData@ConnData {connAgentVersion, enableNtfs, duplexHandsh
DB.execute db "INSERT INTO connections (conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake) VALUES (?, ?, ?, ?, ?)" (connId, SCMInvitation, connAgentVersion, enableNtfs, duplexHandshake)
void $ insertSndQueue_ db connId q
getRcvConn :: DB.Connection -> SMPServer -> SMP.RecipientId -> IO (Either StoreError SomeConn)
getRcvConn db ProtocolServer {host, port} rcvId =
DB.queryNamed
db
[sql|
SELECT q.conn_id
FROM rcv_queues q
WHERE q.host = :host AND q.port = :port AND q.rcv_id = :rcv_id;
|]
[":host" := host, ":port" := port, ":rcv_id" := rcvId]
>>= \case
[Only connId] -> getConn db connId
_ -> pure $ Left SEConnNotFound
getRcvConn :: DB.Connection -> SMPServer -> SMP.RecipientId -> IO (Either StoreError (RcvQueue, SomeConn))
getRcvConn db ProtocolServer {host, port} rcvId = runExceptT $ do
rq@RcvQueue {connId} <-
ExceptT . firstRow toRcvQueue SEConnNotFound $
DB.query db (rcvQueueQuery <> " WHERE q.host = ? AND q.port = ? AND q.rcv_id = ?") (host, port, rcvId)
(rq,) <$> ExceptT (getConn db connId)
deleteConn :: DB.Connection -> ConnId -> IO ()
deleteConn db connId =
@@ -446,19 +441,19 @@ setSndQueueStatus db SndQueue {sndId, server = ProtocolServer {host, port}} stat
setRcvQueuePrimary :: DB.Connection -> ConnId -> RcvQueue -> IO ()
setRcvQueuePrimary db connId RcvQueue {dbQueueId} = do
DB.execute db "UPDATE rcv_queues SET rcv_primary = ?, next_rcv_primary = ? WHERE conn_id = ?" (False, False, connId)
DB.execute db "UPDATE rcv_queues SET rcv_primary = ? WHERE conn_id = ?" (False, connId)
DB.execute
db
"UPDATE rcv_queues SET rcv_primary = ?, next_rcv_primary = ?, replace_rcv_queue_id = ? WHERE conn_id = ? AND rcv_queue_id = ?"
(True, False, Nothing :: Maybe Int64, connId, dbQueueId)
"UPDATE rcv_queues SET rcv_primary = ?, replace_rcv_queue_id = ? WHERE conn_id = ? AND rcv_queue_id = ?"
(True, Nothing :: Maybe Int64, connId, dbQueueId)
setSndQueuePrimary :: DB.Connection -> ConnId -> SndQueue -> IO ()
setSndQueuePrimary db connId SndQueue {dbQueueId} = do
DB.execute db "UPDATE snd_queues SET snd_primary = ?, next_snd_primary = ? WHERE conn_id = ?" (False, False, connId)
DB.execute db "UPDATE snd_queues SET snd_primary = ? WHERE conn_id = ?" (False, connId)
DB.execute
db
"UPDATE snd_queues SET snd_primary = ?, next_snd_primary = ?, replace_snd_queue_id = ? WHERE conn_id = ? AND snd_queue_id = ?"
(True, False, Nothing :: Maybe Int64, connId, dbQueueId)
"UPDATE snd_queues SET snd_primary = ?, replace_snd_queue_id = ? WHERE conn_id = ? AND snd_queue_id = ?"
(True, Nothing :: Maybe Int64, connId, dbQueueId)
deleteConnRcvQueue :: DB.Connection -> ConnId -> RcvQueue -> IO ()
deleteConnRcvQueue db connId RcvQueue {dbQueueId} =
@@ -475,7 +470,7 @@ getPrimaryRcvQueue db connId =
getRcvQueue :: DB.Connection -> ConnId -> SMPServer -> SMP.RecipientId -> IO (Either StoreError RcvQueue)
getRcvQueue db connId (SMPServer host port _) rcvId =
firstRow (toRcvQueue connId) SEConnNotFound $
firstRow toRcvQueue SEConnNotFound $
DB.query db (rcvQueueQuery <> "WHERE q.conn_id = ? AND q.host = ? AND q.port = ? AND q.rcv_id = ?") (connId, host, port, rcvId)
setRcvQueueNtfCreds :: DB.Connection -> ConnId -> Maybe ClientNtfCreds -> IO ()
@@ -949,9 +944,10 @@ getNtfSubscription db connId =
_ -> Nothing
in (NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}, action)
createNtfSubscription :: DB.Connection -> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
createNtfSubscription db ntfSubscription action actionTs = do
createNtfSubscription :: DB.Connection -> NtfSubscription -> NtfSubAction -> IO ()
createNtfSubscription db ntfSubscription action = do
let NtfSubscription {connId, smpServer = (SMPServer host port _), ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} = ntfSubscription
actionTs <- liftIO getCurrentTime
DB.execute
db
[sql|
@@ -966,9 +962,9 @@ createNtfSubscription db ntfSubscription action actionTs = do
where
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action
supervisorUpdateNtfSubscription :: DB.Connection -> NtfSubscription -> NtfSubAction -> NtfActionTs -> IO ()
supervisorUpdateNtfSubscription db NtfSubscription {connId, ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} action actionTs = do
updatedAt <- getCurrentTime
supervisorUpdateNtfSub :: DB.Connection -> NtfSubscription -> NtfSubAction -> IO ()
supervisorUpdateNtfSub db NtfSubscription {connId, ntfQueueId, ntfServer = (NtfServer ntfHost ntfPort _), ntfSubId, ntfSubStatus} action = do
ts <- getCurrentTime
DB.execute
db
[sql|
@@ -976,13 +972,13 @@ supervisorUpdateNtfSubscription db NtfSubscription {connId, ntfQueueId, ntfServe
SET smp_ntf_id = ?, ntf_host = ?, ntf_port = ?, ntf_sub_id = ?, ntf_sub_status = ?, ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ?
WHERE conn_id = ?
|]
((ntfQueueId, ntfHost, ntfPort, ntfSubId) :. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, actionTs, True, updatedAt, connId))
((ntfQueueId, ntfHost, ntfPort, ntfSubId) :. (ntfSubStatus, ntfSubAction, ntfSubSMPAction, ts, True, ts, connId))
where
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action
supervisorUpdateNtfSubAction :: DB.Connection -> ConnId -> NtfSubAction -> NtfActionTs -> IO ()
supervisorUpdateNtfSubAction db connId action actionTs = do
updatedAt <- getCurrentTime
supervisorUpdateNtfAction :: DB.Connection -> ConnId -> NtfSubAction -> IO ()
supervisorUpdateNtfAction db connId action = do
ts <- getCurrentTime
DB.execute
db
[sql|
@@ -990,7 +986,7 @@ supervisorUpdateNtfSubAction db connId action actionTs = do
SET ntf_sub_action = ?, ntf_sub_smp_action = ?, ntf_sub_action_ts = ?, updated_by_supervisor = ?, updated_at = ?
WHERE conn_id = ?
|]
(ntfSubAction, ntfSubSMPAction, actionTs, True, updatedAt, connId)
(ntfSubAction, ntfSubSMPAction, ts, True, ts, connId)
where
(ntfSubAction, ntfSubSMPAction) = ntfSubAndSMPAction action
@@ -1285,9 +1281,9 @@ insertRcvQueue_ db connId' RcvQueue {..} = do
db
[sql|
INSERT INTO rcv_queues
(host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret, snd_id, status, rcv_queue_id, rcv_primary, next_rcv_primary, replace_rcv_queue_id, smp_client_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
(host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret, snd_id, status, rcv_queue_id, rcv_primary, replace_rcv_queue_id, smp_client_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?);
|]
((host server, port server, rcvId, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret) :. (sndId, status, qId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion))
((host server, port server, rcvId, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret) :. (sndId, status, qId, primary, dbReplaceQueueId, smpClientVersion))
pure qId
-- * createSndConn helpers
@@ -1299,9 +1295,9 @@ insertSndQueue_ db connId' SndQueue {..} = do
db
[sql|
INSERT INTO snd_queues
(host, port, snd_id, conn_id, snd_public_key, snd_private_key, e2e_pub_key, e2e_dh_secret, status, snd_queue_id, snd_primary, next_snd_primary, replace_snd_queue_id, smp_client_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?);
(host, port, snd_id, conn_id, snd_public_key, snd_private_key, e2e_pub_key, e2e_dh_secret, status, snd_queue_id, snd_primary, replace_snd_queue_id, smp_client_version) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?);
|]
((host server, port server, sndId, connId', sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret) :. (status, qId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion))
((host server, port server, sndId, connId', sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret) :. (status, qId, primary, dbReplaceQueueId, smpClientVersion))
pure qId
newQueueId_ :: [Only Int64] -> Int64
@@ -1311,62 +1307,71 @@ newQueueId_ (Only maxId : _) = maxId + 1
-- * getConn helpers
getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
getConn dbConn connId =
getConn db connId = getAnyConn db connId False
getAnyConn :: DB.Connection -> ConnId -> Bool -> IO (Either StoreError SomeConn)
getAnyConn dbConn connId deleted' =
getConnData dbConn connId >>= \case
Nothing -> pure $ Left SEConnNotFound
Just (cData, cMode) -> do
rQ <- getRcvQueuesByConnId_ dbConn connId
sQ <- getSndQueuesByConnId_ dbConn connId
pure $ case (rQ, sQ, cMode) of
(Just rqs, Just sqs, CMInvitation) -> Right $ SomeConn SCDuplex (DuplexConnection cData rqs sqs)
(Just (rq :| _), Nothing, CMInvitation) -> Right $ SomeConn SCRcv (RcvConnection cData rq)
(Nothing, Just (sq :| _), CMInvitation) -> Right $ SomeConn SCSnd (SndConnection cData sq)
(Just (rq :| _), Nothing, CMContact) -> Right $ SomeConn SCContact (ContactConnection cData rq)
(Nothing, Nothing, _) -> Right $ SomeConn SCNew (NewConnection cData)
_ -> Left SEConnNotFound
Just (cData@ConnData {deleted}, cMode)
| deleted /= deleted' -> pure $ Left SEConnNotFound
| otherwise -> do
rQ <- getRcvQueuesByConnId_ dbConn connId
sQ <- getSndQueuesByConnId_ dbConn connId
pure $ case (rQ, sQ, cMode) of
(Just rqs, Just sqs, CMInvitation) -> Right $ SomeConn SCDuplex (DuplexConnection cData rqs sqs)
(Just (rq :| _), Nothing, CMInvitation) -> Right $ SomeConn SCRcv (RcvConnection cData rq)
(Nothing, Just (sq :| _), CMInvitation) -> Right $ SomeConn SCSnd (SndConnection cData sq)
(Just (rq :| _), Nothing, CMContact) -> Right $ SomeConn SCContact (ContactConnection cData rq)
(Nothing, Nothing, _) -> Right $ SomeConn SCNew (NewConnection cData)
_ -> Left SEConnNotFound
getConnData :: DB.Connection -> ConnId -> IO (Maybe (ConnData, ConnectionMode))
getConnData dbConn connId' =
maybeFirstRow cData $ DB.query dbConn "SELECT conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake FROM connections WHERE conn_id = ?;" (Only connId')
maybeFirstRow cData $ DB.query dbConn "SELECT conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake, deleted FROM connections WHERE conn_id = ?;" (Only connId')
where
cData (connId, cMode, connAgentVersion, enableNtfs_, duplexHandshake) = (ConnData {connId, connAgentVersion, enableNtfs = fromMaybe True enableNtfs_, duplexHandshake}, cMode)
cData (connId, cMode, connAgentVersion, enableNtfs_, duplexHandshake, deleted) = (ConnData {connId, connAgentVersion, enableNtfs = fromMaybe True enableNtfs_, duplexHandshake, deleted}, cMode)
setConnDeleted :: DB.Connection -> ConnId -> IO ()
setConnDeleted db connId = DB.execute db "UPDATE connections SET deleted = ? WHERE conn_id = ?" (True, connId)
-- | returns all connection queues, the first queue is the primary one
getRcvQueuesByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueue))
getRcvQueuesByConnId_ db connId =
L.nonEmpty . sortBy primaryFirst . map (toRcvQueue connId)
L.nonEmpty . sortBy primaryFirst . map toRcvQueue
<$> DB.query db (rcvQueueQuery <> "WHERE q.conn_id = ?") (Only connId)
where
primaryFirst RcvQueue {primary = p} RcvQueue {primary = p'} = compare (Down p) (Down p')
primaryFirst RcvQueue {primary = p, dbReplaceQueueId = i} RcvQueue {primary = p', dbReplaceQueueId = i'} =
-- the current primary queue is ordered first, the next primary - second
compare (Down p) (Down p') <> compare i i'
rcvQueueQuery :: Query
rcvQueueQuery =
[sql|
SELECT s.key_hash, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret,
SELECT s.key_hash, q.conn_id, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret,
q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.status,
q.rcv_queue_id, q.rcv_primary, q.next_rcv_primary, q.replace_rcv_queue_id, q.smp_client_version,
q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.smp_client_version,
q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret
FROM rcv_queues q
INNER JOIN servers s ON q.host = s.host AND q.port = s.port
|]
toRcvQueue ::
ConnId ->
(C.KeyHash, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateSignKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, QueueStatus)
:. (Int64, Bool, Bool, Maybe Int64, Maybe Version)
(C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateSignKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, QueueStatus)
:. (Int64, Bool, Maybe Int64, Maybe Version)
:. (Maybe SMP.NtfPublicVerifyKey, Maybe SMP.NtfPrivateSignKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret) ->
RcvQueue
toRcvQueue connId ((keyHash, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (dbQueueId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion_) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) =
toRcvQueue ((keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status) :. (dbQueueId, primary, dbReplaceQueueId, smpClientVersion_) :. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)) =
let server = SMPServer host port keyHash
smpClientVersion = fromMaybe 1 smpClientVersion_
clientNtfCreds = case (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_) of
(Just ntfPublicKey, Just ntfPrivateKey, Just notifierId, Just rcvNtfDhSecret) -> Just $ ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret}
_ -> Nothing
in RcvQueue {connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbQueueId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion, clientNtfCreds}
in RcvQueue {connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, status, dbQueueId, primary, dbReplaceQueueId, smpClientVersion, clientNtfCreds}
getRcvQueueById_ :: DB.Connection -> ConnId -> Int64 -> IO (Either StoreError RcvQueue)
getRcvQueueById_ db connId dbRcvId =
firstRow (toRcvQueue connId) SEConnNotFound $
firstRow toRcvQueue SEConnNotFound $
DB.query db (rcvQueueQuery <> " WHERE conn_id = ? AND rcv_queue_id = ?") (connId, dbRcvId)
-- | returns all connection queues, the first queue is the primary one
@@ -1376,17 +1381,19 @@ getSndQueuesByConnId_ dbConn connId =
<$> DB.query
dbConn
[sql|
SELECT s.key_hash, q.host, q.port, q.snd_id, q.snd_public_key, q.snd_private_key, q.e2e_pub_key, q.e2e_dh_secret, q.status, q.snd_queue_id, q.snd_primary, q.next_snd_primary, q.replace_snd_queue_id, q.smp_client_version
SELECT s.key_hash, q.host, q.port, q.snd_id, q.snd_public_key, q.snd_private_key, q.e2e_pub_key, q.e2e_dh_secret, q.status, q.snd_queue_id, q.snd_primary, q.replace_snd_queue_id, q.smp_client_version
FROM snd_queues q
INNER JOIN servers s ON q.host = s.host AND q.port = s.port
WHERE q.conn_id = ?;
|]
(Only connId)
where
sndQueue ((keyHash, host, port, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status) :. (dbQueueId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion)) =
sndQueue ((keyHash, host, port, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status) :. (dbQueueId, primary, dbReplaceQueueId, smpClientVersion)) =
let server = SMPServer host port keyHash
in SndQueue {connId, server, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, dbQueueId, primary, nextPrimary, dbReplaceQueueId, smpClientVersion}
primaryFirst SndQueue {primary = p} SndQueue {primary = p'} = compare (Down p) (Down p')
in SndQueue {connId, server, sndId, sndPublicKey, sndPrivateKey, e2ePubKey, e2eDhSecret, status, dbQueueId, primary, dbReplaceQueueId, smpClientVersion}
primaryFirst SndQueue {primary = p, dbReplaceQueueId = i} SndQueue {primary = p', dbReplaceQueueId = i'} =
-- the current primary queue is ordered first, the next primary - second
compare (Down p) (Down p') <> compare i i'
-- * updateRcvIds helpers

View File

@@ -18,9 +18,6 @@ CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues (conn_id, rcv_queue_id);
ALTER TABLE rcv_queues ADD COLUMN rcv_primary INTEGER CHECK (rcv_primary NOT NULL);
UPDATE rcv_queues SET rcv_primary = 1;
ALTER TABLE rcv_queues ADD COLUMN next_rcv_primary INTEGER CHECK (next_rcv_primary NOT NULL);
UPDATE rcv_queues SET next_rcv_primary = 0;
ALTER TABLE rcv_queues ADD COLUMN replace_rcv_queue_id INTEGER NULL;
-- snd_queues
@@ -31,11 +28,12 @@ CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues (conn_id, snd_queue_id);
ALTER TABLE snd_queues ADD COLUMN snd_primary INTEGER CHECK (snd_primary NOT NULL);
UPDATE snd_queues SET snd_primary = 1;
ALTER TABLE snd_queues ADD COLUMN next_snd_primary INTEGER CHECK (next_snd_primary NOT NULL);
UPDATE snd_queues SET next_snd_primary = 0;
ALTER TABLE snd_queues ADD COLUMN replace_snd_queue_id INTEGER NULL;
-- connections
ALTER TABLE connections ADD COLUMN deleted INTEGER DEFAULT 0 CHECK (deleted NOT NULL);
UPDATE connections SET deleted = 0;
-- messages
CREATE TABLE snd_message_deliveries (
snd_message_delivery_id INTEGER PRIMARY KEY AUTOINCREMENT,

View File

@@ -21,7 +21,8 @@ CREATE TABLE connections(
smp_agent_version INTEGER NOT NULL DEFAULT 1
,
duplex_handshake INTEGER NULL DEFAULT 0,
enable_ntfs INTEGER
enable_ntfs INTEGER,
deleted INTEGER DEFAULT 0 CHECK(deleted NOT NULL)
) WITHOUT ROWID;
CREATE TABLE rcv_queues(
host TEXT NOT NULL,
@@ -43,7 +44,6 @@ CREATE TABLE rcv_queues(
rcv_ntf_dh_secret BLOB,
rcv_queue_id INTEGER CHECK(rcv_queue_id NOT NULL),
rcv_primary INTEGER CHECK(rcv_primary NOT NULL),
next_rcv_primary INTEGER CHECK(next_rcv_primary NOT NULL),
replace_rcv_queue_id INTEGER NULL,
PRIMARY KEY(host, port, rcv_id),
FOREIGN KEY(host, port) REFERENCES servers
@@ -64,7 +64,6 @@ CREATE TABLE snd_queues(
e2e_pub_key BLOB,
snd_queue_id INTEGER CHECK(snd_queue_id NOT NULL),
snd_primary INTEGER CHECK(snd_primary NOT NULL),
next_snd_primary INTEGER CHECK(next_snd_primary NOT NULL),
replace_snd_queue_id INTEGER NULL,
PRIMARY KEY(host, port, snd_id),
FOREIGN KEY(host, port) REFERENCES servers

View File

@@ -87,6 +87,7 @@ isDeleteNtfSubAction = \case
NSACreate -> False
NSACheck -> False
NSADelete -> True
NSARotate -> True
NtfSubSMPAction a -> case a of
NSASmpKey -> False
NSASmpDelete -> True
@@ -97,6 +98,7 @@ data NtfSubNTFAction
= NSACreate
| NSACheck
| NSADelete
| NSARotate
deriving (Show)
instance Encoding NtfSubNTFAction where
@@ -104,11 +106,13 @@ instance Encoding NtfSubNTFAction where
NSACreate -> "N"
NSACheck -> "C"
NSADelete -> "D"
NSARotate -> "R"
smpP =
A.anyChar >>= \case
'N' -> pure NSACreate
'C' -> pure NSACheck
'D' -> pure NSADelete
'R' -> pure NSARotate
_ -> fail "bad NtfSubNTFAction"
instance FromField NtfSubNTFAction where fromField = blobFieldDecoder smpDecode

View File

@@ -5,11 +5,15 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE RankNTypes #-}
{-# OPTIONS_GHC -fno-warn-incomplete-uni-patterns #-}
module AgentTests.FunctionalAPITests
( functionalAPITests,
testServerMatrix2,
makeConnection,
exchangeGreetingsMsgId,
switchComplete,
get,
(##>),
(=##>),
@@ -117,14 +121,12 @@ functionalAPITests t = do
it "should accept connection using async command" $
withSmpServer t testAcceptContactAsync
describe "Queue rotation" $ do
it "should switch delivery to the new queue (1 server)" $
withSmpServer t $ testSwitchConnection initAgentServers
it "should switch delivery to the new queue (2 servers)" $
withSmpServer t . withSmpServerOn t testPort2 $ testSwitchConnection initAgentServers2
it "should switch to new queue asynchronously (1 server)" $
withSmpServer t $ testSwitchAsync initAgentServers
it "should switch to new queue asynchronously (2 servers)" $
withSmpServer t . withSmpServerOn t testPort2 $ testSwitchAsync initAgentServers2
describe "should switch delivery to the new queue" $
testServerMatrix2 t testSwitchConnection
describe "should switch to new queue asynchronously" $
testServerMatrix2 t testSwitchAsync
describe "should delete connection during rotation" $
testServerMatrix2 t testSwitchDelete
testMatrix2 :: ATransport -> (AgentClient -> AgentClient -> AgentMsgId -> IO ()) -> Spec
testMatrix2 t runTest = do
@@ -140,6 +142,11 @@ testRatchetMatrix2 t runTest = do
it "ratchets v1 to v2" $ withSmpServer t $ runTestCfg2 agentCfgRatchetV1 agentCfg 3 runTest
it "ratchets v2 to v1" $ withSmpServer t $ runTestCfg2 agentCfg agentCfgRatchetV1 3 runTest
testServerMatrix2 :: ATransport -> (InitialAgentServers -> IO ()) -> Spec
testServerMatrix2 t runTest = do
it "1 server" $ withSmpServer t $ runTest initAgentServers
it "2 servers" $ withSmpServer t . withSmpServerOn t testPort2 $ runTest initAgentServers2
runTestCfg2 :: AgentConfig -> AgentConfig -> AgentMsgId -> (AgentClient -> AgentClient -> AgentMsgId -> IO ()) -> IO ()
runTestCfg2 aliceCfg bobCfg baseMsgId runTest = do
alice <- getSMPAgentClient aliceCfg initAgentServers
@@ -651,22 +658,24 @@ testAcceptContactAsync = do
testSwitchConnection :: InitialAgentServers -> IO ()
testSwitchConnection servers = do
a <- getSMPAgentClient agentCfg servers
b <- getSMPAgentClient agentCfg {database = testDB2} servers
b <- getSMPAgentClient agentCfg {database = testDB2, initialClientId = 1} servers
Right () <- runExceptT $ do
(aId, bId) <- makeConnection a b
exchangeGreetingsMsgId 4 a bId b aId
switchConnectionAsync a "" bId
phase a bId SPStarted
phase b aId SPStarted
phase a bId SPConfirmed
phase b aId SPConfirmed
phase a bId SPTested
phase b aId SPTested
phase b aId SPCompleted
phase a bId SPCompleted
exchangeGreetingsMsgId 12 a bId b aId
switchComplete a bId b aId
exchangeGreetingsMsgId 10 a bId b aId
pure ()
switchComplete :: AgentClient -> ByteString -> AgentClient -> ByteString -> ExceptT AgentErrorType IO ()
switchComplete a bId b aId = do
phase a bId SPStarted
phase b aId SPStarted
phase a bId SPConfirmed
phase b aId SPConfirmed
phase b aId SPCompleted
phase a bId SPCompleted
phase :: AgentClient -> ByteString -> SwitchPhase -> ExceptT AgentErrorType IO ()
phase c connId p =
get c >>= \(_, connId', msg) -> do
@@ -685,39 +694,51 @@ testSwitchAsync servers = do
(aId, bId) <- makeConnection a b
exchangeGreetingsMsgId 4 a bId b aId
pure (aId, bId)
let phaseA = withA . phase' bId
phaseB = withB . phase' aId
Right () <- withA $ \a -> runExceptT $ do
subscribeConnection a bId
let withA' = session withA bId
withB' = session withB aId
withA' $ \a -> do
switchConnectionAsync a "" bId
phase a bId SPStarted
liftIO $ threadDelay 500000
phaseB SPStarted
phaseA SPConfirmed
phaseB SPConfirmed
phaseA SPTested
Right () <- withB $ \b -> runExceptT $ do
subscribeConnection b aId
phase b aId SPTested
withB' $ \b -> phase b aId SPStarted
withA' $ \a -> phase a bId SPConfirmed
withB' $ \b -> do
phase b aId SPConfirmed
phase b aId SPCompleted
phaseA SPCompleted
withA' $ \a -> phase a bId SPCompleted
Right () <- withA $ \a -> withB $ \b -> runExceptT $ do
subscribeConnection a bId
subscribeConnection b aId
exchangeGreetingsMsgId 12 a bId b aId
exchangeGreetingsMsgId 10 a bId b aId
pure ()
where
withAgent :: AgentConfig -> (AgentClient -> IO a) -> IO a
withAgent cfg' = bracket (getSMPAgentClient cfg' servers) disconnectAgentClient
phase' connId p c = do
Right () <- runExceptT $ do
session :: (forall a. (AgentClient -> IO a) -> IO a) -> ConnId -> (AgentClient -> ExceptT AgentErrorType IO ()) -> IO ()
session withC connId a = do
Right () <- withC $ \c -> runExceptT $ do
subscribeConnection c connId
phase c connId p
r <- a c
liftIO $ threadDelay 500000
pure r
pure ()
withA = withAgent agentCfg
withB = withAgent agentCfg {database = testDB2, initialClientId = 1}
testSwitchDelete :: InitialAgentServers -> IO ()
testSwitchDelete servers = do
a <- getSMPAgentClient agentCfg servers
b <- getSMPAgentClient agentCfg {database = testDB2, initialClientId = 1} servers
Right () <- runExceptT $ do
(aId, bId) <- makeConnection a b
exchangeGreetingsMsgId 4 a bId b aId
disconnectAgentClient b
switchConnectionAsync a "" bId
phase a bId SPStarted
deleteConnectionAsync a "1" bId
("1", bId', OK) <- get a
liftIO $ bId `shouldBe` bId'
pure ()
exchangeGreetings :: AgentClient -> ConnId -> AgentClient -> ConnId -> ExceptT AgentErrorType IO ()
exchangeGreetings = exchangeGreetingsMsgId 4

View File

@@ -8,8 +8,7 @@
module AgentTests.NotificationTests where
-- import Control.Logger.Simple (LogConfig (..), LogLevel (..), setLogLevel, withGlobalLogging)
import AgentTests.FunctionalAPITests (get, makeConnection, (##>), (=##>), pattern Msg)
import AgentTests.FunctionalAPITests (exchangeGreetingsMsgId, get, makeConnection, switchComplete, testServerMatrix2, (##>), (=##>), pattern Msg)
import Control.Concurrent (killThread, threadDelay)
import Control.Monad.Except
import qualified Data.Aeson as J
@@ -22,7 +21,7 @@ import NtfClient
import SMPAgentClient (agentCfg, initAgentServers, testDB, testDB2)
import SMPClient (testPort, withSmpServer, withSmpServerStoreLogOn)
import Simplex.Messaging.Agent
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), databaseFile)
import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..), InitialAgentServers, databaseFile)
import Simplex.Messaging.Agent.Protocol
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
@@ -84,6 +83,10 @@ notificationTests t =
it "should resume subscriptions after SMP server is restarted" $ \_ ->
withAPNSMockServer $ \apns ->
withNtfServer t $ testNotificationsSMPRestart t apns
describe "should switch notifications to the new queue" $
testServerMatrix2 t $ \servers ->
withAPNSMockServer $ \apns ->
withNtfServer t $ testSwitchNotifications servers apns
testNotificationToken :: APNSMockServer -> IO ()
testNotificationToken APNSMockServer {apnsQ} = do
@@ -485,6 +488,28 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = do
liftIO $ killThread threadId
pure ()
testSwitchNotifications :: InitialAgentServers -> APNSMockServer -> IO ()
testSwitchNotifications servers APNSMockServer {apnsQ} = do
a <- getSMPAgentClient agentCfg servers
b <- getSMPAgentClient agentCfg {database = testDB2, initialClientId = 1} servers
Right () <- runExceptT $ do
(aId, bId) <- makeConnection a b
exchangeGreetingsMsgId 4 a bId b aId
_ <- registerTestToken a "abcd" NMInstant apnsQ
liftIO $ threadDelay 250000
let testMessage msg = do
msgId <- sendMessage b aId (SMP.MsgFlags True) msg
get b ##> ("", aId, SENT msgId)
void $ messageNotification apnsQ
get a =##> \case ("", c, Msg msg') -> c == bId && msg == msg'; _ -> False
ackMessage a bId msgId
testMessage "hello"
switchConnectionAsync a "" bId
switchComplete a bId b aId
liftIO $ threadDelay 500000
testMessage "hello again"
pure ()
messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString)
messageNotification apnsQ = do
1000000 `timeout` atomically (readTBQueue apnsQ) >>= \case

View File

@@ -140,7 +140,7 @@ testForeignKeysEnabled =
`shouldThrow` (\e -> DB.sqlError e == DB.ErrorConstraint)
cData1 :: ConnData
cData1 = ConnData {connId = "conn1", connAgentVersion = 1, enableNtfs = True, duplexHandshake = Nothing}
cData1 = ConnData {connId = "conn1", connAgentVersion = 1, enableNtfs = True, duplexHandshake = Nothing, deleted = False}
testPrivateSignKey :: C.APrivateSignKey
testPrivateSignKey = C.APrivateSignKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe"
@@ -165,7 +165,6 @@ rcvQueue1 =
status = New,
dbQueueId = 1,
primary = True,
nextPrimary = False,
dbReplaceQueueId = Nothing,
smpClientVersion = 1,
clientNtfCreds = Nothing
@@ -184,7 +183,6 @@ sndQueue1 =
status = New,
dbQueueId = 1,
primary = True,
nextPrimary = False,
dbReplaceQueueId = Nothing,
smpClientVersion = 1
}
@@ -222,7 +220,7 @@ testCreateRcvConnDuplicate =
g <- newTVarIO =<< drgNew
_ <- createRcvConn db g cData1 rcvQueue1 SCMInvitation
createRcvConn db g cData1 rcvQueue1 SCMInvitation
`shouldReturn` Left (SEConnDuplicate)
`shouldReturn` Left SEConnDuplicate
testCreateSndConn :: SpecWith SQLiteStore
testCreateSndConn =
@@ -257,7 +255,7 @@ testCreateSndConnDuplicate =
g <- newTVarIO =<< drgNew
_ <- createSndConn db g cData1 sndQueue1
createSndConn db g cData1 sndQueue1
`shouldReturn` Left (SEConnDuplicate)
`shouldReturn` Left SEConnDuplicate
testGetRcvConn :: SpecWith SQLiteStore
testGetRcvConn =
@@ -267,7 +265,7 @@ testGetRcvConn =
g <- newTVarIO =<< drgNew
_ <- createRcvConn db g cData1 rcvQueue1 SCMInvitation
getRcvConn db smpServer recipientId
`shouldReturn` Right (SomeConn SCRcv (RcvConnection cData1 rcvQueue1))
`shouldReturn` Right (rcvQueue1, SomeConn SCRcv (RcvConnection cData1 rcvQueue1))
testDeleteRcvConn :: SpecWith SQLiteStore
testDeleteRcvConn =
@@ -280,7 +278,7 @@ testDeleteRcvConn =
`shouldReturn` ()
-- TODO check queues are deleted as well
getConn db "conn1"
`shouldReturn` Left (SEConnNotFound)
`shouldReturn` Left SEConnNotFound
testDeleteSndConn :: SpecWith SQLiteStore
testDeleteSndConn =
@@ -293,7 +291,7 @@ testDeleteSndConn =
`shouldReturn` ()
-- TODO check queues are deleted as well
getConn db "conn1"
`shouldReturn` Left (SEConnNotFound)
`shouldReturn` Left SEConnNotFound
testDeleteDuplexConn :: SpecWith SQLiteStore
testDeleteDuplexConn =
@@ -326,7 +324,6 @@ testUpgradeRcvConnToDuplex =
status = New,
dbQueueId = 1,
primary = True,
nextPrimary = False,
dbReplaceQueueId = Nothing,
smpClientVersion = 1
}
@@ -354,7 +351,6 @@ testUpgradeSndConnToDuplex =
status = New,
dbQueueId = 1,
primary = True,
nextPrimary = False,
dbReplaceQueueId = Nothing,
smpClientVersion = 1,
clientNtfCreds = Nothing