mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-26 07:34:53 +00:00
agent: pass PQ encryption flag separately for each message in batch APIs (#1027)
This commit is contained in:
committed by
GitHub
parent
ba1bfaa5aa
commit
52a67daea6
@@ -295,14 +295,14 @@ resubscribeConnections c = withAgentEnv c . resubscribeConnections' c
|
||||
sendMessage :: AgentErrorMonad m => AgentClient -> ConnId -> CR.PQEncryption -> MsgFlags -> MsgBody -> m (AgentMsgId, CR.PQEncryption)
|
||||
sendMessage c = withAgentEnv c .:: sendMessage' c
|
||||
|
||||
type MsgReq = (ConnId, MsgFlags, MsgBody)
|
||||
type MsgReq = (ConnId, CR.PQEncryption, MsgFlags, MsgBody)
|
||||
|
||||
-- | Send multiple messages to different connections (SEND command)
|
||||
sendMessages :: MonadUnliftIO m => AgentClient -> CR.PQEncryption -> [MsgReq] -> m [Either AgentErrorType (AgentMsgId, CR.PQEncryption)]
|
||||
sendMessages c = withAgentEnv c .: sendMessages' c
|
||||
sendMessages :: MonadUnliftIO m => AgentClient -> [MsgReq] -> m [Either AgentErrorType (AgentMsgId, CR.PQEncryption)]
|
||||
sendMessages c = withAgentEnv c . sendMessages' c
|
||||
|
||||
sendMessagesB :: (MonadUnliftIO m, Traversable t) => AgentClient -> CR.PQEncryption -> t (Either AgentErrorType MsgReq) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption)))
|
||||
sendMessagesB c = withAgentEnv c .: sendMessagesB' c
|
||||
sendMessagesB :: (MonadUnliftIO m, Traversable t) => AgentClient -> t (Either AgentErrorType MsgReq) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption)))
|
||||
sendMessagesB c = withAgentEnv c . sendMessagesB' c
|
||||
|
||||
ackMessage :: AgentErrorMonad m => AgentClient -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m ()
|
||||
ackMessage c = withAgentEnv c .:. ackMessage' c
|
||||
@@ -911,29 +911,29 @@ getNotificationMessage' c nonce encNtfInfo = do
|
||||
|
||||
-- | Send message to the connection (SEND command) in Reader monad
|
||||
sendMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> CR.PQEncryption -> MsgFlags -> MsgBody -> m (AgentMsgId, CR.PQEncryption)
|
||||
sendMessage' c connId pqEnc msgFlags msg = liftEither . runIdentity =<< sendMessagesB' c pqEnc (Identity (Right (connId, msgFlags, msg)))
|
||||
sendMessage' c connId pqEnc msgFlags msg = liftEither . runIdentity =<< sendMessagesB' c (Identity (Right (connId, pqEnc, msgFlags, msg)))
|
||||
|
||||
-- | Send multiple messages to different connections (SEND command) in Reader monad
|
||||
sendMessages' :: forall m. AgentMonad' m => AgentClient -> CR.PQEncryption -> [MsgReq] -> m [Either AgentErrorType (AgentMsgId, CR.PQEncryption)]
|
||||
sendMessages' c pqEnc = sendMessagesB' c pqEnc . map Right
|
||||
sendMessages' :: forall m. AgentMonad' m => AgentClient -> [MsgReq] -> m [Either AgentErrorType (AgentMsgId, CR.PQEncryption)]
|
||||
sendMessages' c = sendMessagesB' c . map Right
|
||||
|
||||
sendMessagesB' :: forall m t. (AgentMonad' m, Traversable t) => AgentClient -> CR.PQEncryption -> t (Either AgentErrorType MsgReq) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption)))
|
||||
sendMessagesB' c pqEnc reqs = withConnLocks c connIds "sendMessages" $ do
|
||||
reqs' <- withStoreBatch c (\db -> fmap (bindRight $ \req@(connId, _, _) -> bimap storeError (req,) <$> getConn db connId) reqs)
|
||||
sendMessagesB' :: forall m t. (AgentMonad' m, Traversable t) => AgentClient -> t (Either AgentErrorType MsgReq) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption)))
|
||||
sendMessagesB' c reqs = withConnLocks c connIds "sendMessages" $ do
|
||||
reqs' <- withStoreBatch c (\db -> fmap (bindRight $ \req@(connId, _, _, _) -> bimap storeError (req,) <$> getConn db connId) reqs)
|
||||
let reqs'' = fmap (>>= prepareConn) reqs'
|
||||
enqueueMessagesB c (Just pqEnc) reqs''
|
||||
enqueueMessagesB c reqs''
|
||||
where
|
||||
prepareConn :: (MsgReq, SomeConn) -> Either AgentErrorType (ConnData, NonEmpty SndQueue, MsgFlags, AMessage)
|
||||
prepareConn ((_, msgFlags, msg), SomeConn _ conn) = case conn of
|
||||
prepareConn :: (MsgReq, SomeConn) -> Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage)
|
||||
prepareConn ((_, pqEnc, msgFlags, msg), SomeConn _ conn) = case conn of
|
||||
DuplexConnection cData _ sqs -> prepareMsg cData sqs
|
||||
SndConnection cData sq -> prepareMsg cData [sq]
|
||||
_ -> Left $ CONN SIMPLEX
|
||||
where
|
||||
prepareMsg :: ConnData -> NonEmpty SndQueue -> Either AgentErrorType (ConnData, NonEmpty SndQueue, MsgFlags, AMessage)
|
||||
prepareMsg :: ConnData -> NonEmpty SndQueue -> Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage)
|
||||
prepareMsg cData sqs
|
||||
| ratchetSyncSendProhibited cData = Left $ CMD PROHIBITED
|
||||
| otherwise = Right (cData, sqs, msgFlags, A_MSG msg)
|
||||
connIds = map (\(connId, _, _) -> connId) $ rights $ toList reqs
|
||||
| otherwise = Right (cData, sqs, Just pqEnc, msgFlags, A_MSG msg)
|
||||
connIds = map (\(connId, _, _, _) -> connId) $ rights $ toList reqs
|
||||
|
||||
-- / async command processing v v v
|
||||
|
||||
@@ -1089,11 +1089,11 @@ enqueueMessages c cData sqs pqEnc_ msgFlags aMessage = do
|
||||
|
||||
enqueueMessages' :: AgentMonad m => AgentClient -> ConnData -> NonEmpty SndQueue -> Maybe CR.PQEncryption -> MsgFlags -> AMessage -> m (AgentMsgId, CR.PQEncryption)
|
||||
enqueueMessages' c cData sqs pqEnc_ msgFlags aMessage =
|
||||
liftEither . runIdentity =<< enqueueMessagesB c pqEnc_ (Identity (Right (cData, sqs, msgFlags, aMessage)))
|
||||
liftEither . runIdentity =<< enqueueMessagesB c (Identity (Right (cData, sqs, pqEnc_, msgFlags, aMessage)))
|
||||
|
||||
enqueueMessagesB :: (AgentMonad' m, Traversable t) => AgentClient -> Maybe CR.PQEncryption -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, MsgFlags, AMessage)) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption)))
|
||||
enqueueMessagesB c pqEnc_ reqs = do
|
||||
reqs' <- enqueueMessageB c pqEnc_ reqs
|
||||
enqueueMessagesB :: (AgentMonad' m, Traversable t) => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage)) -> m (t (Either AgentErrorType (AgentMsgId, CR.PQEncryption)))
|
||||
enqueueMessagesB c reqs = do
|
||||
reqs' <- enqueueMessageB c reqs
|
||||
enqueueSavedMessageB c $ mapMaybe snd $ rights $ toList reqs'
|
||||
pure $ fst <$$> reqs'
|
||||
|
||||
@@ -1102,20 +1102,20 @@ isActiveSndQ SndQueue {status} = status == Secured || status == Active
|
||||
|
||||
enqueueMessage :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> Maybe CR.PQEncryption -> MsgFlags -> AMessage -> m (AgentMsgId, CR.PQEncryption)
|
||||
enqueueMessage c cData sq pqEnc_ msgFlags aMessage =
|
||||
liftEither . fmap fst . runIdentity =<< enqueueMessageB c pqEnc_ (Identity (Right (cData, [sq], msgFlags, aMessage)))
|
||||
liftEither . fmap fst . runIdentity =<< enqueueMessageB c (Identity (Right (cData, [sq], pqEnc_, msgFlags, aMessage)))
|
||||
|
||||
-- this function is used only for sending messages in batch, it returns the list of successes to enqueue additional deliveries
|
||||
enqueueMessageB :: forall m t. (AgentMonad' m, Traversable t) => AgentClient -> Maybe CR.PQEncryption -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, MsgFlags, AMessage)) -> m (t (Either AgentErrorType ((AgentMsgId, CR.PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId))))
|
||||
enqueueMessageB c pqEnc_ reqs = do
|
||||
enqueueMessageB :: forall m t. (AgentMonad' m, Traversable t) => AgentClient -> t (Either AgentErrorType (ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage)) -> m (t (Either AgentErrorType ((AgentMsgId, CR.PQEncryption), Maybe (ConnData, [SndQueue], AgentMsgId))))
|
||||
enqueueMessageB c reqs = do
|
||||
aVRange <- asks $ maxVersion . smpAgentVRange . config
|
||||
reqMids <- withStoreBatch c $ \db -> fmap (bindRight $ storeSentMsg db aVRange) reqs
|
||||
forME reqMids $ \((cData, sq :| sqs, _, _), InternalId msgId, pqSecr) -> do
|
||||
forME reqMids $ \((cData, sq :| sqs, _, _, _), InternalId msgId, pqSecr) -> do
|
||||
submitPendingMsg c cData sq
|
||||
let sqs' = filter isActiveSndQ sqs
|
||||
pure $ Right ((msgId, pqSecr), if null sqs' then Nothing else Just (cData, sqs', msgId))
|
||||
where
|
||||
storeSentMsg :: DB.Connection -> VersionSMPA -> (ConnData, NonEmpty SndQueue, MsgFlags, AMessage) -> IO (Either AgentErrorType ((ConnData, NonEmpty SndQueue, MsgFlags, AMessage), InternalId, CR.PQEncryption))
|
||||
storeSentMsg db agentVersion req@(ConnData {connId}, sq :| _, msgFlags, aMessage) = fmap (first storeError) $ runExceptT $ do
|
||||
storeSentMsg :: DB.Connection -> VersionSMPA -> (ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage) -> IO (Either AgentErrorType ((ConnData, NonEmpty SndQueue, Maybe CR.PQEncryption, MsgFlags, AMessage), InternalId, CR.PQEncryption))
|
||||
storeSentMsg db agentVersion req@(ConnData {connId}, sq :| _, pqEnc_, msgFlags, aMessage) = fmap (first storeError) $ runExceptT $ do
|
||||
internalTs <- liftIO getCurrentTime
|
||||
(internalId, internalSndId, prevMsgHash) <- liftIO $ updateSndIds db connId
|
||||
let privHeader = APrivHeader (unSndId internalSndId) prevMsgHash
|
||||
|
||||
Reference in New Issue
Block a user