mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 01:35:22 +00:00
committed by
GitHub
parent
9cd5f96fe4
commit
29332a5e9f
@@ -906,7 +906,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
|
||||
-- If initiating party were to send CON to the user without waiting for reply HELLO (to reduce handshake time),
|
||||
-- it would lead to the non-deterministic internal ID of the first sent message, at to some other race conditions,
|
||||
-- because it can be sent before HELLO is received
|
||||
-- With `status == Aclive` condition, CON is sent here only by the accepting party, that previously received HELLO
|
||||
-- With `status == Active` condition, CON is sent here only by the accepting party, that previously received HELLO
|
||||
when (status == Active) $ notify CON
|
||||
-- Party joining connection sends REPLY after HELLO in v1,
|
||||
-- it is an error to send REPLY in duplexHandshake mode (v2),
|
||||
@@ -1188,7 +1188,7 @@ sendNtfConnCommands c cmd = do
|
||||
ns <- asks ntfSupervisor
|
||||
connIds <- atomically $ getSubscriptions c
|
||||
forM_ connIds $ \connId -> do
|
||||
withStore' c (\db -> getConnData db connId) >>= \case
|
||||
withStore' c (`getConnData` connId) >>= \case
|
||||
Just (ConnData {enableNtfs}, _) ->
|
||||
when enableNtfs . atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd)
|
||||
_ ->
|
||||
@@ -1232,7 +1232,7 @@ suspendAgent' c@AgentClient {agentState = as} maxDelay = do
|
||||
suspendSendingAndDatabase c
|
||||
|
||||
execAgentStoreSQL' :: AgentMonad m => AgentClient -> Text -> m [Text]
|
||||
execAgentStoreSQL' c sql = withStore' c (`exexSQL` sql)
|
||||
execAgentStoreSQL' c sql = withStore' c (`execSQL` sql)
|
||||
|
||||
getSMPServer :: AgentMonad m => AgentClient -> m SMPServer
|
||||
getSMPServer c = readTVarIO (smpServers c) >>= pickServer
|
||||
@@ -1391,7 +1391,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, v, sessId, rId, cm
|
||||
New -> case (conn, e2eEncryption) of
|
||||
-- party initiating connection
|
||||
(RcvConnection {}, Just e2eSndParams) -> do
|
||||
(pk1, rcDHRs) <- withStore c $ (`getRatchetX3dhKeys` connId)
|
||||
(pk1, rcDHRs) <- withStore c (`getRatchetX3dhKeys` connId)
|
||||
let rc = CR.initRcvRatchet rcDHRs $ CR.x3dhRcv pk1 rcDHRs e2eSndParams
|
||||
(agentMsgBody_, rc', skipped) <- liftError cryptoError $ CR.rcDecrypt rc M.empty encConnInfo
|
||||
case (agentMsgBody_, skipped) of
|
||||
|
||||
@@ -567,7 +567,7 @@ subscribeQueues c srv qs = do
|
||||
liftIO $ zip qs_ . L.toList <$> subscribeSMPQueues smp qs2
|
||||
forM_ rs' $ \((connId, rq), r) -> liftIO $ processSubResult c rq connId r
|
||||
pure $ map (bimap fst (first $ protocolClientError SMP)) rs'
|
||||
_ -> pure $ (Nothing, M.fromList errs)
|
||||
_ -> pure (Nothing, M.fromList errs)
|
||||
where
|
||||
checkQueue rq@(connId, RcvQueue {rcvId, server}) = do
|
||||
prohibited <- atomically . TM.member (server, rcvId) $ getMsgLocks c
|
||||
|
||||
@@ -69,7 +69,7 @@ data AgentDatabase
|
||||
|
||||
databaseFile :: AgentDatabase -> FilePath
|
||||
databaseFile = \case
|
||||
AgentDB (SQLiteStore {dbFilePath}) -> dbFilePath
|
||||
AgentDB SQLiteStore {dbFilePath} -> dbFilePath
|
||||
AgentDBFile {dbFile} -> dbFile
|
||||
|
||||
data AgentConfig = AgentConfig
|
||||
|
||||
@@ -735,7 +735,7 @@ instance StrEncoding SMPQueueUri where
|
||||
where
|
||||
query = strEncode . QSP QEscape
|
||||
queryParams = [("v", strEncode vr), ("dh", strEncode dhPublicKey)]
|
||||
srvParam = [("srv", strEncode $ TransportHosts_ hs) | length hs > 0]
|
||||
srvParam = [("srv", strEncode $ TransportHosts_ hs) | not (null hs)]
|
||||
hs = L.tail $ host srv
|
||||
strP = do
|
||||
srv@ProtocolServer {host = h :| host} <- strP <* A.char '/'
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE StandaloneDeriving #-}
|
||||
{-# LANGUAGE TypeApplications #-}
|
||||
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store where
|
||||
|
||||
@@ -24,7 +24,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
connectSQLiteStore,
|
||||
closeSQLiteStore,
|
||||
sqlString,
|
||||
exexSQL,
|
||||
execSQL,
|
||||
|
||||
-- * Queues and connections
|
||||
createNewConn,
|
||||
@@ -242,8 +242,8 @@ sqlString s = quote <> T.replace quote "''" (T.pack s) <> quote
|
||||
-- auto_vacuum <- DB.query_ db "PRAGMA auto_vacuum;" :: IO [[Int]]
|
||||
-- print $ path <> " auto_vacuum: " <> show auto_vacuum
|
||||
|
||||
exexSQL :: DB.Connection -> Text -> IO [Text]
|
||||
exexSQL db query = do
|
||||
execSQL :: DB.Connection -> Text -> IO [Text]
|
||||
execSQL db query = do
|
||||
rs <- newIORef []
|
||||
SQLite3.execWithCallback (DB.connectionHandle db) query (addSQLResultRow rs)
|
||||
reverse <$> readIORef rs
|
||||
|
||||
@@ -17,10 +17,9 @@ module Simplex.Messaging.Agent.Store.SQLite.Migrations
|
||||
where
|
||||
|
||||
import Control.Monad (forM_, when)
|
||||
import Data.List (intercalate, sortBy)
|
||||
import Data.List (intercalate, sortOn)
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import qualified Data.Map as M
|
||||
import Data.Ord (comparing)
|
||||
import Data.Text (Text)
|
||||
import Data.Text.Encoding (decodeLatin1)
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
@@ -57,7 +56,7 @@ schemaMigrations =
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
app :: [Migration]
|
||||
app = sortBy (comparing name) $ map migration schemaMigrations
|
||||
app = sortOn name $ map migration schemaMigrations
|
||||
where
|
||||
migration (name, query) = Migration {name = name, up = fromQuery query}
|
||||
|
||||
|
||||
@@ -105,7 +105,7 @@ data ProtocolClient msg = ProtocolClient
|
||||
tcpTimeout :: Int,
|
||||
clientCorrId :: TVar Natural,
|
||||
sentCommands :: TMap CorrId (Request msg),
|
||||
sndQ :: TBQueue (NonEmpty (SentRawTransmission)),
|
||||
sndQ :: TBQueue (NonEmpty SentRawTransmission),
|
||||
rcvQ :: TBQueue (NonEmpty (SignedTransmission msg)),
|
||||
msgQ :: Maybe (TBQueue (ServerTransmission msg))
|
||||
}
|
||||
|
||||
@@ -265,7 +265,7 @@ subscribeQueue ca srv sub = do
|
||||
|
||||
showServer :: SMPServer -> ByteString
|
||||
showServer ProtocolServer {host, port} =
|
||||
strEncode host <> (B.pack $ if null port then "" else ':' : port)
|
||||
strEncode host <> B.pack (if null port then "" else ':' : port)
|
||||
|
||||
smpSubscribe :: SMPClient -> (SMPSub, C.APrivateSignKey) -> ExceptT ProtocolClientError IO ()
|
||||
smpSubscribe smp ((party, queueId), privKey) = subscribe_ smp privKey queueId
|
||||
|
||||
@@ -387,7 +387,7 @@ instance StrEncoding DeviceToken where
|
||||
strP = DeviceToken <$> strP <* A.space <*> hexStringP
|
||||
where
|
||||
hexStringP =
|
||||
A.takeWhile (\c -> (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f')) >>= \s ->
|
||||
A.takeWhile (\c -> A.isDigit c || (c >= 'a' && c <= 'f')) >>= \s ->
|
||||
if even (B.length s) then pure s else fail "odd number of hex characters"
|
||||
|
||||
instance ToJSON DeviceToken where
|
||||
|
||||
@@ -7,7 +7,6 @@
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedLists #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Types where
|
||||
|
||||
|
||||
@@ -927,7 +927,7 @@ instance ProtocolEncoding Cmd where
|
||||
CT SSender tag ->
|
||||
Cmd SSender <$> case tag of
|
||||
SEND_
|
||||
| v == 1 -> SEND <$> pure noMsgFlags <*> (unTail <$> _smpP)
|
||||
| v == 1 -> SEND noMsgFlags <$> (unTail <$> _smpP)
|
||||
| otherwise -> SEND <$> _smpP <*> (unTail <$> _smpP)
|
||||
PING_ -> pure PING
|
||||
CT SNotifier NSUB_ -> pure $ Cmd SNotifier NSUB
|
||||
@@ -1049,7 +1049,7 @@ instance Encoding CommandError where
|
||||
_ -> fail "bad command error type"
|
||||
|
||||
-- | Send signed SMP transmission to TCP transport.
|
||||
tPut :: Transport c => THandle c -> NonEmpty (SentRawTransmission) -> IO (NonEmpty (Either TransportError ()))
|
||||
tPut :: Transport c => THandle c -> NonEmpty SentRawTransmission -> IO (NonEmpty (Either TransportError ()))
|
||||
tPut th trs
|
||||
| batch th = tPutBatch [] $ L.map tEncode trs
|
||||
| otherwise = forM trs $ tPutBlock th . tEncode
|
||||
|
||||
@@ -8,7 +8,6 @@
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedLists #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user