switch message delivery

This commit is contained in:
Evgeny Poberezkin
2022-08-26 21:31:44 +01:00
parent b50f773dcd
commit b7dd971e3f
2 changed files with 32 additions and 8 deletions

View File

@@ -73,7 +73,7 @@ module Simplex.Messaging.Agent
)
where
import Control.Concurrent.STM (flushTBQueue, stateTVar)
import Control.Concurrent.STM (flushTBQueue, retry, stateTVar)
import Control.Logger.Simple (logInfo, showText)
import Control.Monad.Except
import Control.Monad.IO.Unlift (MonadUnliftIO)
@@ -111,6 +111,7 @@ import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Parsers (parse)
import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, NtfServer, SMPMsgMeta, SndPublicVerifyKey)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Util
import Simplex.Messaging.Version
@@ -1382,12 +1383,32 @@ processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, se
-- processed by queue sender
rqSwitch :: (SMPServer, SMP.SenderId) -> m ()
rqSwitch (smpServer, senderId) = case conn of
DuplexConnection _ _ sq -> do
DuplexConnection _ _ sq@SndQueue {server, sndId} -> do
withStore' c (`getNextSndQueue` dbNextSndQueueId sq) >>= \case
Just sq'@SndQueue {server, sndId} -> do
unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address"
withStore' c $ \db -> switchCurrSndQueue db sq sq'
-- update unsent messages? or just restart message deliveries?
Just sq'@SndQueue {server = server', sndId = sndId'} -> do
unless (smpServer == server' && senderId == sndId') . throwError $ INTERNAL "incorrect queue address"
let qKey = (connId, server, sndId)
qKey' = (connId, server', sndId')
ok <-
switchQueues qKey qKey' `catchError` \e -> do
atomically (switchDeliveries qKey' qKey)
throwError e
unless ok $ throwError $ INTERNAL "switching snd queue failed in STM"
where
switchQueues :: MsgDeliveryKey -> MsgDeliveryKey -> m Bool
switchQueues k k' = withStore' c $ \db -> do
ok <- atomically $ (switchDeliveries k k' $> True) `orElse` pure False
when ok $ switchCurrSndQueue db sq sq'
pure ok
switchDeliveries :: MsgDeliveryKey -> MsgDeliveryKey -> STM ()
switchDeliveries k k' = do
switchDelivery smpQueueMsgQueues k k'
switchDelivery smpQueueMsgDeliveries k k'
switchDelivery :: (AgentClient -> TMap MsgDeliveryKey a) -> MsgDeliveryKey -> MsgDeliveryKey -> STM ()
switchDelivery sel k k' =
TM.lookupDelete k (sel c) >>= \case
Just d -> TM.insert k' d (sel c)
_ -> retry
_ -> throwError $ INTERNAL "message can only be sent during rotation"
_ -> throwError $ INTERNAL "message can only be sent to duplex connection"

View File

@@ -16,6 +16,7 @@
module Simplex.Messaging.Agent.Client
( AgentClient (..),
MsgDeliveryKey,
newAgentClient,
withAgentLock,
closeAgentClient,
@@ -145,6 +146,8 @@ type SMPClientVar = TMVar (Either AgentErrorType SMPClient)
type NtfClientVar = TMVar (Either AgentErrorType NtfClient)
type MsgDeliveryKey = (ConnId, SMPServer, SMP.SenderId)
data AgentClient = AgentClient
{ active :: TVar Bool,
rcvQ :: TBQueue (ATransmission 'Client),
@@ -159,8 +162,8 @@ data AgentClient = AgentClient
pendingSubscrSrvrs :: TMap SMPServer (TMap ConnId RcvQueue),
subscrConns :: TMap ConnId SMPServer,
connMsgsQueued :: TMap ConnId Bool,
smpQueueMsgQueues :: TMap (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId),
smpQueueMsgDeliveries :: TMap (ConnId, SMPServer, SMP.SenderId) (Async ()),
smpQueueMsgQueues :: TMap MsgDeliveryKey (TQueue InternalId),
smpQueueMsgDeliveries :: TMap MsgDeliveryKey (Async ()),
nextRcvQueueMsgs :: TMap (ConnId, SMPServer, SMP.RecipientId) [ServerTransmission BrokerMsg],
ntfNetworkOp :: TVar AgentOpState,
rcvNetworkOp :: TVar AgentOpState,