From b7dd971e3f55f2f0ec8d94ca4177204f60f0ca24 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Fri, 26 Aug 2022 21:31:44 +0100 Subject: [PATCH] switch message delivery --- src/Simplex/Messaging/Agent.hs | 33 ++++++++++++++++++++++----- src/Simplex/Messaging/Agent/Client.hs | 7 ++++-- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index df2ce5ebc..44fbb6fa3 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -73,7 +73,7 @@ module Simplex.Messaging.Agent ) where -import Control.Concurrent.STM (flushTBQueue, stateTVar) +import Control.Concurrent.STM (flushTBQueue, retry, stateTVar) import Control.Logger.Simple (logInfo, showText) import Control.Monad.Except import Control.Monad.IO.Unlift (MonadUnliftIO) @@ -111,6 +111,7 @@ import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (parse) import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, NtfServer, SMPMsgMeta, SndPublicVerifyKey) import qualified Simplex.Messaging.Protocol as SMP +import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util import Simplex.Messaging.Version @@ -1382,12 +1383,32 @@ processSMPTransmission c@AgentClient {smpClients, subQ} transmission@(srv, v, se -- processed by queue sender rqSwitch :: (SMPServer, SMP.SenderId) -> m () rqSwitch (smpServer, senderId) = case conn of - DuplexConnection _ _ sq -> do + DuplexConnection _ _ sq@SndQueue {server, sndId} -> do withStore' c (`getNextSndQueue` dbNextSndQueueId sq) >>= \case - Just sq'@SndQueue {server, sndId} -> do - unless (smpServer == server && senderId == sndId) . throwError $ INTERNAL "incorrect queue address" - withStore' c $ \db -> switchCurrSndQueue db sq sq' - -- update unsent messages? or just restart message deliveries? + Just sq'@SndQueue {server = server', sndId = sndId'} -> do + unless (smpServer == server' && senderId == sndId') . throwError $ INTERNAL "incorrect queue address" + let qKey = (connId, server, sndId) + qKey' = (connId, server', sndId') + ok <- + switchQueues qKey qKey' `catchError` \e -> do + atomically (switchDeliveries qKey' qKey) + throwError e + unless ok $ throwError $ INTERNAL "switching snd queue failed in STM" + where + switchQueues :: MsgDeliveryKey -> MsgDeliveryKey -> m Bool + switchQueues k k' = withStore' c $ \db -> do + ok <- atomically $ (switchDeliveries k k' $> True) `orElse` pure False + when ok $ switchCurrSndQueue db sq sq' + pure ok + switchDeliveries :: MsgDeliveryKey -> MsgDeliveryKey -> STM () + switchDeliveries k k' = do + switchDelivery smpQueueMsgQueues k k' + switchDelivery smpQueueMsgDeliveries k k' + switchDelivery :: (AgentClient -> TMap MsgDeliveryKey a) -> MsgDeliveryKey -> MsgDeliveryKey -> STM () + switchDelivery sel k k' = + TM.lookupDelete k (sel c) >>= \case + Just d -> TM.insert k' d (sel c) + _ -> retry _ -> throwError $ INTERNAL "message can only be sent during rotation" _ -> throwError $ INTERNAL "message can only be sent to duplex connection" diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 3bb1862fd..035643bae 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -16,6 +16,7 @@ module Simplex.Messaging.Agent.Client ( AgentClient (..), + MsgDeliveryKey, newAgentClient, withAgentLock, closeAgentClient, @@ -145,6 +146,8 @@ type SMPClientVar = TMVar (Either AgentErrorType SMPClient) type NtfClientVar = TMVar (Either AgentErrorType NtfClient) +type MsgDeliveryKey = (ConnId, SMPServer, SMP.SenderId) + data AgentClient = AgentClient { active :: TVar Bool, rcvQ :: TBQueue (ATransmission 'Client), @@ -159,8 +162,8 @@ data AgentClient = AgentClient pendingSubscrSrvrs :: TMap SMPServer (TMap ConnId RcvQueue), subscrConns :: TMap ConnId SMPServer, connMsgsQueued :: TMap ConnId Bool, - smpQueueMsgQueues :: TMap (ConnId, SMPServer, SMP.SenderId) (TQueue InternalId), - smpQueueMsgDeliveries :: TMap (ConnId, SMPServer, SMP.SenderId) (Async ()), + smpQueueMsgQueues :: TMap MsgDeliveryKey (TQueue InternalId), + smpQueueMsgDeliveries :: TMap MsgDeliveryKey (Async ()), nextRcvQueueMsgs :: TMap (ConnId, SMPServer, SMP.RecipientId) [ServerTransmission BrokerMsg], ntfNetworkOp :: TVar AgentOpState, rcvNetworkOp :: TVar AgentOpState,