diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index e8af60b96..74d4e1c40 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -295,14 +295,14 @@ resubscribeConnections c = withAgentEnv c . resubscribeConnections' c sendMessage :: AgentErrorMonad m => AgentClient -> ConnId -> CR.PQEncryption -> MsgFlags -> MsgBody -> m (AgentMsgId, CR.PQEncryption) sendMessage c = withAgentEnv c .:: sendMessage' c -type MsgReq = (ConnId, MsgFlags, MsgBody) +type MsgReq = (ConnId, CR.PQEncryption, MsgFlags, MsgBody) -- | Send multiple messages to different connections (SEND command) -sendMessages :: MonadUnliftIO m => AgentClient -> CR.PQEncryption -> [MsgReq] -> m [Either AgentErrorType (AgentMsgId, CR.PQEncryption)] -sendMessages c = withAgentEnv c .: sendMessages' c +sendMessages :: MonadUnliftIO m => AgentClient -> [MsgReq] -> m [Either AgentErrorType (AgentMsgId, CR.PQEncryption)] +sendMessages c = withAgentEnv c . sendMessages' c -sendMessagesB :: (MonadUnliftIO m, Traversable t) => AgentClient -> CR.PQEncryption -> t (Either AgentErrorType MsgReq) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption))) -sendMessagesB c = withAgentEnv c .: sendMessagesB' c +sendMessagesB :: (MonadUnliftIO m, Traversable t) => AgentClient -> t (Either AgentErrorType MsgReq) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption))) +sendMessagesB c = withAgentEnv c . sendMessagesB' c ackMessage :: AgentErrorMonad m => AgentClient -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m () ackMessage c = withAgentEnv c .:. ackMessage' c @@ -911,29 +911,29 @@ getNotificationMessage' c nonce encNtfInfo = do -- | Send message to the connection (SEND command) in Reader monad sendMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> CR.PQEncryption -> MsgFlags -> MsgBody -> m (AgentMsgId, CR.PQEncryption) -sendMessage' c connId pqEnc msgFlags msg = liftEither . runIdentity =<< sendMessagesB' c pqEnc (Identity (Right (connId, msgFlags, msg))) +sendMessage' c connId pqEnc msgFlags msg = liftEither . runIdentity =<< sendMessagesB' c (Identity (Right (connId, pqEnc, msgFlags, msg))) -- | Send multiple messages to different connections (SEND command) in Reader monad -sendMessages' :: forall m. AgentMonad' m => AgentClient -> CR.PQEncryption -> [MsgReq] -> m [Either AgentErrorType (AgentMsgId, CR.PQEncryption)] -sendMessages' c pqEnc = sendMessagesB' c pqEnc . map Right +sendMessages' :: forall m. AgentMonad' m => AgentClient -> [MsgReq] -> m [Either AgentErrorType (AgentMsgId, CR.PQEncryption)] +sendMessages' c = sendMessagesB' c . map Right -sendMessagesB' :: forall m t. (AgentMonad' m, Traversable t) => AgentClient -> CR.PQEncryption -> t (Either AgentErrorType MsgReq) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption))) -sendMessagesB' c pqEnc reqs = withConnLocks c connIds "sendMessages" $ do - reqs' <- withStoreBatch c (\db -> fmap (bindRight $ \req@(connId, _, _) -> bimap storeError (req,) <$> getConn db connId) reqs) +sendMessagesB' :: forall m t. (AgentMonad' m, Traversable t) => AgentClient -> t (Either AgentErrorType MsgReq) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption))) +sendMessagesB' c reqs = withConnLocks c connIds "sendMessages" $ do + reqs' <- withStoreBatch c (\db -> fmap (bindRight $ \req@(connId, _, _, _) -> bimap storeError (req,) <$> getConn db connId) reqs) let reqs'' = fmap (>>= prepareConn) reqs' - enqueueMessagesB c (Just pqEnc) reqs'' + enqueueMessagesB c reqs'' where - prepareConn :: (MsgReq, SomeConn) -> Either AgentErrorType (ConnData, NonEmpty SndQueue, MsgFlags, AMessage) - prepareConn ((_, msgFlags, msg), SomeConn _ conn) = case conn of + prepareConn :: (MsgReq, SomeConn) -> Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage) + prepareConn ((_, pqEnc, msgFlags, msg), SomeConn _ conn) = case conn of DuplexConnection cData _ sqs -> prepareMsg cData sqs SndConnection cData sq -> prepareMsg cData [sq] _ -> Left $ CONN SIMPLEX where - prepareMsg :: ConnData -> NonEmpty SndQueue -> Either AgentErrorType (ConnData, NonEmpty SndQueue, MsgFlags, AMessage) + prepareMsg :: ConnData -> NonEmpty SndQueue -> Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage) prepareMsg cData sqs | ratchetSyncSendProhibited cData = Left $ CMD PROHIBITED - | otherwise = Right (cData, sqs, msgFlags, A_MSG msg) - connIds = map (\(connId, _, _) -> connId) $ rights $ toList reqs + | otherwise = Right (cData, sqs, Just pqEnc, msgFlags, A_MSG msg) + connIds = map (\(connId, _, _, _) -> connId) $ rights $ toList reqs -- / async command processing v v v @@ -1089,11 +1089,11 @@ enqueueMessages c cData sqs pqEnc_ msgFlags aMessage = do enqueueMessages' :: AgentMonad m => AgentClient -> ConnData -> NonEmpty SndQueue -> Maybe CR.PQEncryption -> MsgFlags -> AMessage -> m (AgentMsgId, CR.PQEncryption) enqueueMessages' c cData sqs pqEnc_ msgFlags aMessage = - liftEither . runIdentity =<< enqueueMessagesB c pqEnc_ (Identity (Right (cData, sqs, msgFlags, aMessage))) + liftEither . runIdentity =<< enqueueMessagesB c (Identity (Right (cData, sqs, pqEnc_, msgFlags, aMessage))) -enqueueMessagesB :: (AgentMonad' m, Traversable t) => AgentClient -> Maybe CR.PQEncryption -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, MsgFlags, AMessage)) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption))) -enqueueMessagesB c pqEnc_ reqs = do - reqs' <- enqueueMessageB c pqEnc_ reqs +enqueueMessagesB :: (AgentMonad' m, Traversable t) => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage)) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption))) +enqueueMessagesB c reqs = do + reqs' <- enqueueMessageB c reqs enqueueSavedMessageB c $ mapMaybe snd $ rights $ toList reqs' pure $ fst <$$> reqs' @@ -1102,20 +1102,20 @@ isActiveSndQ SndQueue {status} = status == Secured || status == Active enqueueMessage :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> Maybe CR.PQEncryption -> MsgFlags -> AMessage -> m (AgentMsgId, CR.PQEncryption) enqueueMessage c cData sq pqEnc_ msgFlags aMessage = - liftEither . fmap fst . runIdentity =<< enqueueMessageB c pqEnc_ (Identity (Right (cData, [sq], msgFlags, aMessage))) + liftEither . fmap fst . runIdentity =<< enqueueMessageB c (Identity (Right (cData, [sq], pqEnc_, msgFlags, aMessage))) -- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries -enqueueMessageB :: forall m t. (AgentMonad' m, Traversable t) => AgentClient -> Maybe CR.PQEncryption -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, MsgFlags, AMessage)) -> m (t (Either AgentErrorType ((AgentMsgId, CR.PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId)))) -enqueueMessageB c pqEnc_ reqs = do +enqueueMessageB :: forall m t. (AgentMonad' m, Traversable t) => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage)) -> m (t (Either AgentErrorType ((AgentMsgId, CR.PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId)))) +enqueueMessageB c reqs = do aVRange <- asks $ maxVersion . smpAgentVRange . config reqMids <- withStoreBatch c $ \db -> fmap (bindRight $ storeSentMsg db aVRange) reqs - forME reqMids $ \((cData, sq :| sqs, _, _), InternalId msgId, pqSecr) -> do + forME reqMids $ \((cData, sq :| sqs, _, _, _), InternalId msgId, pqSecr) -> do submitPendingMsg c cData sq let sqs' = filter isActiveSndQ sqs pure $ Right ((msgId, pqSecr), if null sqs' then Nothing else Just (cData, sqs', msgId)) where - storeSentMsg :: DB.Connection -> VersionSMPA -> (ConnData, NonEmpty SndQueue, MsgFlags, AMessage) -> IO (Either AgentErrorType ((ConnData, NonEmpty SndQueue, MsgFlags, AMessage), InternalId, CR.PQEncryption)) - storeSentMsg db agentVersion req@(ConnData {connId}, sq :| _, msgFlags, aMessage) = fmap (first storeError) $ runExceptT $ do + storeSentMsg :: DB.Connection -> VersionSMPA -> (ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage) -> IO (Either AgentErrorType ((ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage), InternalId, CR.PQEncryption)) + storeSentMsg db agentVersion req@(ConnData {connId}, sq :| _, pqEnc_, msgFlags, aMessage) = fmap (first storeError) $ runExceptT $ do internalTs <- liftIO getCurrentTime (internalId, internalSndId, prevMsgHash) <- liftIO $ updateSndIds db connId let privHeader = APrivHeader (unSndId internalSndId) prevMsgHash