change message delivery loop

This commit is contained in:
Evgeny Poberezkin
2022-08-26 14:54:58 +01:00
parent b558eb8243
commit b50f773dcd
2 changed files with 95 additions and 75 deletions

View File

@@ -502,7 +502,7 @@ deleteCurrRcvQueue c ConnData {connId} rq _sq rq'@RcvQueue {server, rcvId} = do
withStore' c $ \db -> switchCurrRcvQueue db rq rq'
atomically $
TM.lookupDelete (connId, server, rcvId) (nextRcvQueueMsgs c)
>>= mapM_ (mapM_ . writeTBQueue $ msgQ c)
>>= mapM_ ((mapM_ . writeTBQueue $ msgQ c) . reverse)
subscribeConnections' :: forall m. AgentMonad m => AgentClient -> [ConnId] -> m (Map ConnId (Either AgentErrorType ()))
subscribeConnections' _ [] = pure M.empty
@@ -644,8 +644,9 @@ enqueueMessage c cData@ConnData {connId, connAgentVersion} sq msgFlags aMessage
resumeMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> m ()
resumeMsgDelivery c cData@ConnData {connId} sq@SndQueue {server, sndId} = do
let qKey = (connId, server, sndId)
unlessM (queueDelivering qKey) $
async (runSmpQueueMsgDelivery c cData sq)
unlessM (queueDelivering qKey) $ do
mq <- atomically $ getPendingMsgQ c connId sq
async (runSmpQueueMsgDelivery c cData mq)
>>= \a -> atomically (TM.insert qKey a $ smpQueueMsgDeliveries c)
unlessM connQueued $
withStore' c (`getPendingMsgs` connId)
@@ -672,9 +673,8 @@ getPendingMsgQ c connId SndQueue {server, sndId} = do
TM.insert qKey mq $ smpQueueMsgQueues c
pure mq
runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> m ()
runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandshake} sq = do
mq <- atomically $ getPendingMsgQ c connId sq
runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> TQueue InternalId -> m ()
runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandshake} mq = do
ri <- asks $ messageRetryInterval . config
forever $ do
atomically $ endAgentOperation c AOSndNetwork
@@ -686,7 +686,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
E.try (withStore c $ \db -> getPendingMsgData db connId msgId) >>= \case
Left (e :: E.SomeException) ->
notify $ MERR mId (INTERNAL $ show e)
Right (rq_, PendingMsgData {msgType, msgBody, msgFlags, internalTs}) ->
Right (rq_, sq, PendingMsgData {msgType, msgBody, msgFlags, internalTs}) ->
withRetryInterval ri $ \loop -> do
resp <- tryError $ case msgType of
AM_CONN_INFO -> sendConfirmation c sq msgBody
@@ -1094,7 +1094,7 @@ subscriber c@AgentClient {msgQ} = forever $ do
Right _ -> return ()
processSMPTransmission :: forall m. AgentMonad m => AgentClient -> ServerTransmission BrokerMsg -> m ()
processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cmd) = do
processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, sessId, rId, cmd) = do
(rq, SomeConn _ conn) <- withStore c $ \db -> getRcvConn db srv rId
processSMP conn (connData conn) rq
where
@@ -1109,6 +1109,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
unless (phVer `isCompatible` clientVRange) . throwError $ AGENT A_VERSION
case (e2eDhSecret, e2ePubKey_) of
(Nothing, Just e2ePubKey) -> do
unless (currRcvQueue) . throwError $ INTERNAL "can only be sent to the current queue"
let e2eDh = C.dh' e2ePubKey e2ePrivKey
decryptClientMessage e2eDh clientMsg >>= \case
(SMP.PHConfirmation senderKey, AgentConfirmation {e2eEncryption, encConnInfo, agentVersion}) ->
@@ -1124,9 +1125,13 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
HELLO -> helloMsg >> ackDelete msgId
REPLY cReq -> replyMsg cReq >> ackDelete msgId
-- note that there is no ACK sent for A_MSG, it is sent with agent's user ACK command
A_MSG body -> do
logServer "<--" c srv rId "MSG <MSG>"
notify $ MSG msgMeta msgFlags body
A_MSG body
| currRcvQueue -> do
logServer "<--" c srv rId "MSG <MSG>"
notify $ MSG msgMeta msgFlags body
| otherwise -> atomically $ TM.alter addTransmission (connId, srv, rId) (nextRcvQueueMsgs c)
where
addTransmission = Just . maybe [transmission] (transmission :)
QNEW currAddr nextQUri -> rqNewMsg currAddr nextQUri >> ackDelete msgId
QKEYS sKey nextQInfo -> rqKeys sKey nextQInfo $ ackDelete msgId
QREADY addr -> rqReady addr >> ackDelete msgId
@@ -1141,9 +1146,18 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
withStore' c $ \db -> deleteMsg db connId internalId
| otherwise -> do
liftEither (parse smpP (AGENT A_MESSAGE) agentMsgBody) >>= \case
AgentMessage _ (A_MSG body) -> do
logServer "<--" c srv rId "MSG <MSG>"
notify $ MSG msgMeta msgFlags body
AgentMessage _ (A_MSG body)
| currRcvQueue -> do
logServer "<--" c srv rId "MSG <MSG>"
notify $ MSG msgMeta msgFlags body
| otherwise -> atomically $ TM.alter addTransmission (connId, srv, rId) (nextRcvQueueMsgs c)
where
addTransmission = Just . maybe [transmission] prependIfDifferent
prependIfDifferent = \case
[] -> [transmission]
ts@((_, _, _, _, cmd') : _)
| cmd == cmd' -> ts
| otherwise -> transmission : ts
_ -> pure ()
_ -> throwError e
Left e -> throwError e
@@ -1266,6 +1280,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
helloMsg :: m ()
helloMsg = do
unless currRcvQueue . throwError $ INTERNAL "can only be sent to the current queue"
logServer "<--" c srv rId "MSG <HELLO>"
case status of
Active -> prohibited
@@ -1287,6 +1302,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
replyMsg :: L.NonEmpty SMPQueueInfo -> m ()
replyMsg smpQueues = do
unless currRcvQueue . throwError $ INTERNAL "can only be sent to the current queue"
logServer "<--" c srv rId "MSG <REPLY>"
case duplexHandshake of
Just True -> prohibited
@@ -1298,70 +1314,61 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
-- processed by queue sender
rqNewMsg :: (SMPServer, SMP.SenderId) -> SMPQueueUri -> m ()
rqNewMsg (smpServer, senderId) nextQUri
| currRcvQueue = case conn of
DuplexConnection _ _ sq@SndQueue {server, sndId}
| smpServer == server && senderId == sndId -> do
clientVRange <- asks $ smpClientVRange . config
case (nextQUri `compatibleVersion` clientVRange) of
Just qInfo@(Compatible nextQInfo) -> do
sq'@SndQueue {sndPublicKey, e2ePubKey} <- newSndQueue qInfo False
withStore' c $ \db -> dbCreateNextSndQueue db sq sq'
case (sndPublicKey, e2ePubKey) of
(Just nextSenderKey, Just dhPublicKey) -> do
let qAddr = (queueAddress (nextQInfo :: SMPQueueInfo)) {dhPublicKey}
nextQueueInfo = (nextQInfo :: SMPQueueInfo) {queueAddress = qAddr}
void . enqueueMessage c cData sq SMP.noMsgFlags $ QKEYS {nextSenderKey, nextQueueInfo}
rq' <- withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq)
notify . SWITCH SPStarted $ connectionStats conn rq' (Just sq')
_ -> throwError $ INTERNAL "absent sender keys"
_ -> throwError $ AGENT A_VERSION
| otherwise -> throwError $ INTERNAL "incorrect queue address"
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
| otherwise = throwError $ INTERNAL "message can only be sent to current queue"
rqNewMsg (smpServer, senderId) nextQUri = case conn of
DuplexConnection _ _ sq@SndQueue {server, sndId} -> do
unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address"
clientVRange <- asks $ smpClientVRange . config
case (nextQUri `compatibleVersion` clientVRange) of
Just qInfo@(Compatible nextQInfo) -> do
sq'@SndQueue {sndPublicKey, e2ePubKey} <- newSndQueue qInfo False
withStore' c $ \db -> dbCreateNextSndQueue db sq sq'
case (sndPublicKey, e2ePubKey) of
(Just nextSenderKey, Just dhPublicKey) -> do
let qAddr = (queueAddress (nextQInfo :: SMPQueueInfo)) {dhPublicKey}
nextQueueInfo = (nextQInfo :: SMPQueueInfo) {queueAddress = qAddr}
void . enqueueMessage c cData sq SMP.noMsgFlags $ QKEYS {nextSenderKey, nextQueueInfo}
rq' <- withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq)
notify . SWITCH SPStarted $ connectionStats conn rq' (Just sq')
_ -> throwError $ INTERNAL "absent sender keys"
_ -> throwError $ AGENT A_VERSION
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
-- processed by queue recipient
rqKeys :: SndPublicVerifyKey -> SMPQueueInfo -> m () -> m ()
rqKeys senderKey qInfo ackDelete
| currRcvQueue = case conn of
rqKeys senderKey qInfo@(SMPQueueInfo clntVer' SMPQueueAddress {smpServer, senderId, dhPublicKey}) ackDelete = do
unless currRcvQueue . throwError $ INTERNAL "message can only be sent to current queue"
case conn of
DuplexConnection _ _ sq -> do
clientVRange <- asks $ smpClientVRange . config
case qInfo `proveCompatible` clientVRange of
Just (Compatible (SMPQueueInfo clntVer' SMPQueueAddress {smpServer, senderId, dhPublicKey})) -> do
withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) >>= \case
Just rq'@RcvQueue {server, sndId, e2ePrivKey = dhPrivKey, smpClientVersion = clntVer}
| server == smpServer && sndId == senderId -> do
let dhSecret = C.dh' dhPublicKey dhPrivKey
withStore' c $ \db -> do
setRcvQueueConfirmedE2E db rq' senderKey dhSecret $ min clntVer clntVer'
setRcvQueueAction db rq $ Just RQASecureNextQueue
ackDelete
secureNextRcvQueue c cData rq sq rq'
| otherwise -> throwError $ INTERNAL "incorrect queue address"
_ -> throwError $ INTERNAL "message can only be sent during rotation"
_ -> throwError $ AGENT A_VERSION
unless (qInfo `isCompatible` clientVRange) . throwError $ AGENT A_VERSION
withStore' c (`getNextRcvQueue` dbNextRcvQueueId rq) >>= \case
Just rq'@RcvQueue {server, sndId, e2ePrivKey = dhPrivKey, smpClientVersion = clntVer} -> do
unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address"
let dhSecret = C.dh' dhPublicKey dhPrivKey
withStore' c $ \db -> do
setRcvQueueConfirmedE2E db rq' senderKey dhSecret $ min clntVer clntVer'
setRcvQueueAction db rq $ Just RQASecureNextQueue
ackDelete
secureNextRcvQueue c cData rq sq rq'
_ -> throwError $ INTERNAL "message can only be sent during rotation"
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
| otherwise = throwError $ INTERNAL "message can only be sent to current queue"
-- processed by queue sender
rqReady :: (SMPServer, SMP.SenderId) -> m ()
rqReady (smpServer, senderId)
| currRcvQueue = case conn of
DuplexConnection _ _ sq ->
withStore' c (`getNextSndQueue` dbNextSndQueueId sq) >>= \case
Just sq'@SndQueue {server, sndId}
| server == smpServer && sndId == senderId ->
void $ enqueueMessage c cData sq' SMP.noMsgFlags QHELLO
| otherwise -> throwError $ INTERNAL "incorrect queue address"
_ -> throwError $ INTERNAL "message can only be sent during rotation"
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
| otherwise = throwError $ INTERNAL "message can only be sent to current queue"
rqReady (smpServer, senderId) = case conn of
DuplexConnection _ _ sq ->
withStore' c (`getNextSndQueue` dbNextSndQueueId sq) >>= \case
Just sq'@SndQueue {server, sndId} -> do
unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address"
void $ enqueueMessage c cData sq' SMP.noMsgFlags QHELLO
_ -> throwError $ INTERNAL "message can only be sent during rotation"
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
-- processed by queue recipient, received from the new queue
rqHello :: m () -> m ()
rqHello ackDelete
| currRcvQueue = throwError $ INTERNAL "message can only be sent to the next queue"
| otherwise = case conn of
rqHello ackDelete = do
when currRcvQueue . throwError $ INTERNAL "message can only be sent to the next queue"
case conn of
DuplexConnection _ currRq sq -> do
let RcvQueue {server, sndId} = rq
void . enqueueMessage c cData sq SMP.noMsgFlags $ QSWITCH (server, sndId)
@@ -1374,11 +1381,15 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
-- processed by queue sender
rqSwitch :: (SMPServer, SMP.SenderId) -> m ()
rqSwitch _addr = do
-- make new queue active
-- delete old queue from the database
-- update unsent messages? or just restart message deliveries?
pure ()
rqSwitch (smpServer, senderId) = case conn of
DuplexConnection _ _ sq -> do
withStore' c (`getNextSndQueue` dbNextSndQueueId sq) >>= \case
Just sq'@SndQueue {server, sndId} -> do
unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address"
withStore' c $ \db -> switchCurrSndQueue db sq sq'
-- update unsent messages? or just restart message deliveries?
_ -> throwError $ INTERNAL "message can only be sent during rotation"
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"
smpInvitation :: ConnectionRequestUri 'CMInvitation -> ConnInfo -> m ()
smpInvitation connReq@(CRInvitationUri crData _) cInfo = do

View File

@@ -44,6 +44,7 @@ module Simplex.Messaging.Agent.Store.SQLite
dbCreateNextSndQueue,
setRcvQueueAction,
switchCurrRcvQueue,
switchCurrSndQueue,
-- Confirmations
createConfirmation,
acceptConfirmation,
@@ -429,6 +430,13 @@ setRcvQueueAction _db _rq _rqAction_ = pure ()
switchCurrRcvQueue :: DB.Connection -> RcvQueue -> RcvQueue -> IO ()
switchCurrRcvQueue _db _rq _nextRq = do
-- make a new queue a main one
-- delete old queue from the database
pure ()
switchCurrSndQueue :: DB.Connection -> SndQueue -> SndQueue -> IO ()
switchCurrSndQueue _db _sq _nextSq = do
-- make new queue active
-- delete old queue from the database
pure ()
type SMPConfirmationRow = (SndPublicVerifyKey, C.PublicKeyX25519, ConnInfo, Maybe [SMPQueueInfo], Maybe Version)
@@ -601,10 +609,11 @@ createSndMsg db connId sndMsgData = do
insertSndMsgDetails_ db connId sndMsgData
updateHashSnd_ db connId sndMsgData
getPendingMsgData :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (Maybe RcvQueue, PendingMsgData))
getPendingMsgData db connId msgId = do
rq_ <- getRcvQueueByConnId_ db connId
(rq_,) <$$> firstRow pendingMsgData SEMsgNotFound getMsgData_
getPendingMsgData :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (Maybe RcvQueue, SndQueue, PendingMsgData))
getPendingMsgData db connId msgId = runExceptT $ do
rq_ <- liftIO $ getRcvQueueByConnId_ db connId
sq <- ExceptT $ maybe (Left SEConnNotFound) Right <$> getSndQueueByConnId_ db connId
ExceptT $ (rq_,sq,) <$$> firstRow pendingMsgData SEMsgNotFound getMsgData_
where
getMsgData_ =
DB.query