Merge branch 'master' into ep/rfc-rotation

This commit is contained in:
Evgeny Poberezkin
2022-08-21 12:28:29 +01:00
16 changed files with 167 additions and 106 deletions
+8
View File
@@ -1,3 +1,11 @@
# 3.2.0
SMP agent:
- Support multiple server hostnames (including onion hostnames) in server addresses.
- Network configuration options.
- Options to define rules to choose server hostname.
# 3.1.0
SMP server and agent:
+1 -1
View File
@@ -45,7 +45,7 @@ ntfServerCLIConfig =
fingerprintFile = combine cfgPath "fingerprint",
defaultServerPort = "443",
executableName = "ntf-server",
serverVersion = "SMP notifications server v1.1.3",
serverVersion = "SMP notifications server v1.2.0",
mkIniFile = \enableStoreLog defaultServerPort ->
"[STORE_LOG]\n\
\# The server uses STM memory for persistence,\n\
+1 -1
View File
@@ -1,5 +1,5 @@
name: simplexmq
version: 3.1.0
version: 3.2.0
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,
+2 -1
View File
@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 3.1.0
version: 3.2.0
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
@@ -51,6 +51,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220608_v2
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220625_v2_ntf_mode
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220811_onion_hosts
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs
Simplex.Messaging.Client
Simplex.Messaging.Client.Agent
Simplex.Messaging.Crypto
+72 -50
View File
@@ -65,7 +65,7 @@ module Simplex.Messaging.Agent
deleteNtfToken,
getNtfToken,
getNtfTokenData,
deleteNtfSub,
toggleConnectionNtfs,
activateAgent,
suspendAgent,
logConnection,
@@ -91,7 +91,6 @@ import qualified Data.Text as T
import Data.Time.Clock
import Data.Time.Clock.System (systemToUTCTime)
import qualified Database.SQLite.Simple as DB
-- import GHC.Conc (unsafeIOToSTM)
import Simplex.Messaging.Agent.Client
import Simplex.Messaging.Agent.Env.SQLite
import Simplex.Messaging.Agent.NtfSubSupervisor
@@ -119,6 +118,8 @@ import UnliftIO.Concurrent (forkFinally, forkIO, threadDelay)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
-- import GHC.Conc (unsafeIOToSTM)
-- | Creates an SMP agent client instance
getSMPAgentClient :: (MonadRandom m, MonadUnliftIO m) => AgentConfig -> InitialAgentServers -> m AgentClient
getSMPAgentClient cfg initServers = newSMPAgentEnv cfg >>= runReaderT runAgent
@@ -141,20 +142,20 @@ resumeAgentClient c = atomically $ writeTVar (active c) True
type AgentErrorMonad m = (MonadUnliftIO m, MonadError AgentErrorType m)
-- | Create SMP agent connection (NEW command)
createConnection :: AgentErrorMonad m => AgentClient -> SConnectionMode c -> m (ConnId, ConnectionRequestUri c)
createConnection c cMode = withAgentEnv c $ newConn c "" cMode
createConnection :: AgentErrorMonad m => AgentClient -> Bool -> SConnectionMode c -> m (ConnId, ConnectionRequestUri c)
createConnection c enableNtfs cMode = withAgentEnv c $ newConn c "" enableNtfs cMode
-- | Join SMP agent connection (JOIN command)
joinConnection :: AgentErrorMonad m => AgentClient -> ConnectionRequestUri c -> ConnInfo -> m ConnId
joinConnection c = withAgentEnv c .: joinConn c ""
joinConnection :: AgentErrorMonad m => AgentClient -> Bool -> ConnectionRequestUri c -> ConnInfo -> m ConnId
joinConnection c enableNtfs = withAgentEnv c .: joinConn c "" enableNtfs
-- | Allow connection to continue after CONF notification (LET command)
allowConnection :: AgentErrorMonad m => AgentClient -> ConnId -> ConfirmationId -> ConnInfo -> m ()
allowConnection c = withAgentEnv c .:. allowConnection' c
-- | Accept contact after REQ notification (ACPT command)
acceptContact :: AgentErrorMonad m => AgentClient -> ConfirmationId -> ConnInfo -> m ConnId
acceptContact c = withAgentEnv c .: acceptContact' c ""
acceptContact :: AgentErrorMonad m => AgentClient -> Bool -> ConfirmationId -> ConnInfo -> m ConnId
acceptContact c enableNtfs = withAgentEnv c .: acceptContact' c "" enableNtfs
-- | Reject contact (RJCT command)
rejectContact :: AgentErrorMonad m => AgentClient -> ConnId -> ConfirmationId -> m ()
@@ -240,9 +241,9 @@ getNtfToken c = withAgentEnv c $ getNtfToken' c
getNtfTokenData :: AgentErrorMonad m => AgentClient -> m NtfToken
getNtfTokenData c = withAgentEnv c $ getNtfTokenData' c
-- | Delete notification subscription for connection
deleteNtfSub :: AgentErrorMonad m => AgentClient -> ConnId -> m ()
deleteNtfSub c = withAgentEnv c . deleteNtfSub' c
-- | Set connection notifications on/off
toggleConnectionNtfs :: AgentErrorMonad m => AgentClient -> ConnId -> Bool -> m ()
toggleConnectionNtfs c = withAgentEnv c .: toggleConnectionNtfs' c
-- | Activate operations
activateAgent :: AgentErrorMonad m => AgentClient -> m ()
@@ -282,10 +283,10 @@ client c@AgentClient {rcvQ, subQ} = forever $ do
-- | execute any SMP agent command
processCommand :: forall m. AgentMonad m => AgentClient -> (ConnId, ACommand 'Client) -> m (ConnId, ACommand 'Agent)
processCommand c (connId, cmd) = case cmd of
NEW (ACM cMode) -> second (INV . ACR cMode) <$> newConn c connId cMode
JOIN (ACR _ cReq) connInfo -> (,OK) <$> joinConn c connId cReq connInfo
NEW (ACM cMode) -> second (INV . ACR cMode) <$> newConn c connId True cMode
JOIN (ACR _ cReq) connInfo -> (,OK) <$> joinConn c connId True cReq connInfo
LET confId ownCInfo -> allowConnection' c connId confId ownCInfo $> (connId, OK)
ACPT invId ownCInfo -> (,OK) <$> acceptContact' c connId invId ownCInfo
ACPT invId ownCInfo -> (,OK) <$> acceptContact' c connId True invId ownCInfo
RJCT invId -> rejectContact' c connId invId $> (connId, OK)
SUB -> subscribeConnection' c connId $> (connId, OK)
SEND msgFlags msgBody -> (connId,) . MID <$> sendMessage' c connId msgFlags msgBody
@@ -294,18 +295,19 @@ processCommand c (connId, cmd) = case cmd of
DEL -> deleteConnection' c connId $> (connId, OK)
CHK -> (connId,) . STAT <$> getConnectionServers' c connId
newConn :: AgentMonad m => AgentClient -> ConnId -> SConnectionMode c -> m (ConnId, ConnectionRequestUri c)
newConn c connId cMode = do
newConn :: AgentMonad m => AgentClient -> ConnId -> Bool -> SConnectionMode c -> m (ConnId, ConnectionRequestUri c)
newConn c connId enableNtfs cMode = do
srv <- getSMPServer c
clientVRange <- asks $ smpClientVRange . config
(rq, qUri) <- newRcvQueue c srv clientVRange
g <- asks idsDrg
connAgentVersion <- asks $ maxVersion . smpAgentVRange . config
let cData = ConnData {connId, connAgentVersion, duplexHandshake = Nothing} -- connection mode is determined by the accepting agent
let cData = ConnData {connId, connAgentVersion, enableNtfs, duplexHandshake = Nothing} -- connection mode is determined by the accepting agent
connId' <- withStore c $ \db -> createRcvConn db g cData rq cMode
addSubscription c rq connId'
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (connId', NSCCreate)
when enableNtfs $ do
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (connId', NSCCreate)
aVRange <- asks $ smpAgentVRange . config
let crData = ConnReqUriData simplexChat aVRange [qUri]
case cMode of
@@ -315,8 +317,8 @@ newConn c connId cMode = do
withStore' c $ \db -> createRatchetX3dhKeys db connId' pk1 pk2
pure (connId', CRInvitationUri crData $ toVersionRangeT e2eRcvParams CR.e2eEncryptVRange)
joinConn :: AgentMonad m => AgentClient -> ConnId -> ConnectionRequestUri c -> ConnInfo -> m ConnId
joinConn c connId (CRInvitationUri (ConnReqUriData _ agentVRange (qUri :| _)) e2eRcvParamsUri) cInfo = do
joinConn :: AgentMonad m => AgentClient -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> m ConnId
joinConn c connId enableNtfs (CRInvitationUri (ConnReqUriData _ agentVRange (qUri :| _)) e2eRcvParamsUri) cInfo = do
aVRange <- asks $ smpAgentVRange . config
clientVRange <- asks $ smpClientVRange . config
case ( qUri `compatibleVersion` clientVRange,
@@ -330,13 +332,13 @@ joinConn c connId (CRInvitationUri (ConnReqUriData _ agentVRange (qUri :| _)) e2
sq <- newSndQueue qInfo
g <- asks idsDrg
let duplexHS = connAgentVersion /= 1
cData = ConnData {connId, connAgentVersion, duplexHandshake = Just duplexHS}
cData = ConnData {connId, connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS}
connId' <- withStore c $ \db -> runExceptT $ do
connId' <- ExceptT $ createSndConn db g cData sq
liftIO $ createRatchet db connId' rc
pure connId'
let cData' = (cData :: ConnData) {connId = connId'}
tryError (confirmQueue aVersion c connId' sq cInfo $ Just e2eSndParams) >>= \case
tryError (confirmQueue aVersion c cData' sq cInfo $ Just e2eSndParams) >>= \case
Right _ -> do
unless duplexHS . void $ enqueueMessage c cData' sq SMP.noMsgFlags HELLO
pure connId'
@@ -345,27 +347,28 @@ joinConn c connId (CRInvitationUri (ConnReqUriData _ agentVRange (qUri :| _)) e2
withStore' c (`deleteConn` connId')
throwError e
_ -> throwError $ AGENT A_VERSION
joinConn c connId (CRContactUri (ConnReqUriData _ agentVRange (qUri :| _))) cInfo = do
joinConn c connId enableNtfs (CRContactUri (ConnReqUriData _ agentVRange (qUri :| _))) cInfo = do
aVRange <- asks $ smpAgentVRange . config
clientVRange <- asks $ smpClientVRange . config
case ( qUri `compatibleVersion` clientVRange,
agentVRange `compatibleVersion` aVRange
) of
(Just qInfo, Just vrsn) -> do
(connId', cReq) <- newConn c connId SCMInvitation
(connId', cReq) <- newConn c connId enableNtfs SCMInvitation
sendInvitation c qInfo vrsn cReq cInfo
pure connId'
_ -> throwError $ AGENT A_VERSION
createReplyQueue :: AgentMonad m => AgentClient -> ConnId -> SndQueue -> m SMPQueueInfo
createReplyQueue c connId SndQueue {smpClientVersion} = do
createReplyQueue :: AgentMonad m => AgentClient -> ConnData -> SndQueue -> m SMPQueueInfo
createReplyQueue c ConnData {connId, enableNtfs} SndQueue {smpClientVersion} = do
srv <- getSMPServer c
(rq, qUri) <- newRcvQueue c srv $ versionToRange smpClientVersion
let qInfo = toVersionT qUri smpClientVersion
addSubscription c rq connId
withStore c $ \db -> upgradeSndConnToDuplex db connId rq
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (connId, NSCCreate)
when enableNtfs $ do
ns <- asks ntfSupervisor
atomically $ sendNtfSubCommand ns (connId, NSCCreate)
pure qInfo
-- | Approve confirmation (LET command) in Reader monad
@@ -382,13 +385,13 @@ allowConnection' c connId confId ownConnInfo =
_ -> throwError $ CMD PROHIBITED
-- | Accept contact (ACPT command) in Reader monad
acceptContact' :: AgentMonad m => AgentClient -> ConnId -> InvitationId -> ConnInfo -> m ConnId
acceptContact' c connId invId ownConnInfo = do
acceptContact' :: AgentMonad m => AgentClient -> ConnId -> Bool -> InvitationId -> ConnInfo -> m ConnId
acceptContact' c connId enableNtfs invId ownConnInfo = do
Invitation {contactConnId, connReq} <- withStore c (`getInvitation` invId)
withStore c (`getConn` contactConnId) >>= \case
SomeConn _ ContactConnection {} -> do
withStore' c $ \db -> acceptInvitation db invId ownConnInfo
joinConn c connId connReq ownConnInfo
joinConn c connId enableNtfs connReq ownConnInfo
_ -> throwError $ CMD PROHIBITED
-- | Reject contact (RJCT command) in Reader monad
@@ -672,7 +675,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {connId, duplexHandsh
-- and this branch should never be reached as receive is created before the confirmation,
-- so the condition is not necessary here, strictly speaking.
_ -> unless (duplexHandshake == Just True) $ do
qInfo <- createReplyQueue c connId sq
qInfo <- createReplyQueue c cData sq
void . enqueueMessage c cData sq SMP.noMsgFlags $ REPLY [qInfo]
AM_A_MSG_ -> notify $ SENT mId
_ -> pure ()
@@ -773,7 +776,7 @@ registerNtfToken' c suppliedDeviceToken suppliedNtfMode =
cron <- asks $ ntfCron . config
agentNtfEnableCron c tknId tkn cron
when (suppliedNtfMode == NMInstant) $ initializeNtfSubs c
when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ smpDeleteNtfSubs c
when (suppliedNtfMode == NMPeriodic && savedNtfMode == NMInstant) $ deleteNtfSubs c NSCDelete
pure ntfTknStatus -- TODO
-- agentNtfCheckToken c tknId tkn >>= \case
| otherwise -> replaceToken tknId $> NTRegistered
@@ -844,7 +847,7 @@ deleteNtfToken' c deviceToken =
Just tkn@NtfToken {deviceToken = savedDeviceToken} -> do
when (deviceToken /= savedDeviceToken) . throwError $ CMD PROHIBITED
deleteToken_ c tkn
smpDeleteNtfSubs c
deleteNtfSubs c NSCSmpDelete
_ -> throwError $ CMD PROHIBITED
getNtfToken' :: AgentMonad m => AgentClient -> m (DeviceToken, NtfTknStatus, NotificationsMode)
@@ -859,11 +862,23 @@ getNtfTokenData' c =
Just tkn -> pure tkn
_ -> throwError $ CMD PROHIBITED
-- | Delete notification subscription for connection, in Reader monad
deleteNtfSub' :: AgentMonad m => AgentClient -> ConnId -> m ()
deleteNtfSub' _c connId = do
ns <- asks ntfSupervisor
atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDelete)
-- | Set connection notifications, in Reader monad
toggleConnectionNtfs' :: forall m. AgentMonad m => AgentClient -> ConnId -> Bool -> m ()
toggleConnectionNtfs' c connId enable = do
withStore c (`getConn` connId) >>= \case
SomeConn _ (DuplexConnection cData _ _) -> toggle cData
SomeConn _ (RcvConnection cData _) -> toggle cData
SomeConn _ (ContactConnection cData _) -> toggle cData
_ -> throwError $ CONN SIMPLEX
where
toggle :: ConnData -> m ()
toggle cData
| enableNtfs cData == enable = pure ()
| otherwise = do
withStore' c $ \db -> setConnectionNtfs db connId enable
ns <- asks ntfSupervisor
let cmd = if enable then NSCCreate else NSCDelete
atomically $ sendNtfSubCommand ns (connId, cmd)
deleteToken_ :: AgentMonad m => AgentClient -> NtfToken -> m ()
deleteToken_ c tkn@NtfToken {ntfTokenId, ntfTknStatus} = do
@@ -898,17 +913,24 @@ withToken c tkn@NtfToken {deviceToken, ntfMode} from_ (toStatus, toAction_) f =
Left e -> throwError e
initializeNtfSubs :: AgentMonad m => AgentClient -> m ()
initializeNtfSubs c = do
ns <- asks ntfSupervisor
connIds <- atomically $ getSubscriptions c
forM_ connIds $ \connId -> atomically $ sendNtfSubCommand ns (connId, NSCCreate)
initializeNtfSubs c = sendNtfConnCommands c NSCCreate
smpDeleteNtfSubs :: AgentMonad m => AgentClient -> m ()
smpDeleteNtfSubs c = do
deleteNtfSubs :: AgentMonad m => AgentClient -> NtfSupervisorCommand -> m ()
deleteNtfSubs c deleteCmd = do
ns <- asks ntfSupervisor
void . atomically . flushTBQueue $ ntfSubQ ns
sendNtfConnCommands c deleteCmd
sendNtfConnCommands :: AgentMonad m => AgentClient -> NtfSupervisorCommand -> m ()
sendNtfConnCommands c cmd = do
ns <- asks ntfSupervisor
connIds <- atomically $ getSubscriptions c
forM_ connIds $ \connId -> atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCSmpDelete)
forM_ connIds $ \connId -> do
withStore' c (\db -> getConnData db connId) >>= \case
Just (ConnData {enableNtfs}, _) ->
when enableNtfs . atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd)
_ ->
atomically $ writeTBQueue (subQ c) ("", connId, ERR $ INTERNAL "no connection data")
-- TODO
-- There should probably be another function to cancel all subscriptions that would flush the queue first,
@@ -1193,8 +1215,8 @@ connectReplyQueues c cData@ConnData {connId} ownConnInfo (qInfo :| _) = do
withStore c $ \db -> upgradeRcvConnToDuplex db connId sq
enqueueConfirmation c cData sq ownConnInfo Nothing
confirmQueue :: forall m. AgentMonad m => Compatible Version -> AgentClient -> ConnId -> SndQueue -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> m ()
confirmQueue (Compatible agentVersion) c connId sq connInfo e2eEncryption = do
confirmQueue :: forall m. AgentMonad m => Compatible Version -> AgentClient -> ConnData -> SndQueue -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> m ()
confirmQueue (Compatible agentVersion) c cData@ConnData {connId} sq connInfo e2eEncryption = do
aMessage <- mkAgentMessage agentVersion
msg <- mkConfirmation aMessage
sendConfirmation c sq msg
@@ -1208,7 +1230,7 @@ confirmQueue (Compatible agentVersion) c connId sq connInfo e2eEncryption = do
mkAgentMessage :: Version -> m AgentMessage
mkAgentMessage 1 = pure $ AgentConnInfo connInfo
mkAgentMessage _ = do
qInfo <- createReplyQueue c connId sq
qInfo <- createReplyQueue c cData sq
pure $ AgentConnInfoReply (qInfo :| []) connInfo
enqueueConfirmation :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> ConnInfo -> Maybe (CR.E2ERatchetParams 'C.X448) -> m ()
+1
View File
@@ -98,6 +98,7 @@ module Simplex.Messaging.Agent.Protocol
queueStatusT,
agentMessageType,
extraSMPServerHosts,
updateSMPServerHosts,
-- * TCP transport functions
tPut,
+1
View File
@@ -151,6 +151,7 @@ deriving instance Show SomeConn
data ConnData = ConnData
{ connId :: ConnId,
connAgentVersion :: Version,
enableNtfs :: Bool,
duplexHandshake :: Maybe Bool -- added in agent protocol v2
}
deriving (Eq, Show)
+15 -9
View File
@@ -27,6 +27,7 @@ module Simplex.Messaging.Agent.Store.SQLite
createRcvConn,
createSndConn,
getConn,
getConnData,
getRcvConn,
deleteConn,
upgradeRcvConnToDuplex,
@@ -84,6 +85,7 @@ module Simplex.Messaging.Agent.Store.SQLite
getNextNtfSubSMPAction,
getActiveNtfToken,
getNtfRcvQueue,
setConnectionNtfs,
-- * utilities
withConnection,
@@ -256,17 +258,17 @@ createConn_ gVar cData create = checkConstraint SEConnDuplicate $ case cData of
ConnData {connId} -> create connId $> Right connId
createRcvConn :: DB.Connection -> TVar ChaChaDRG -> ConnData -> RcvQueue -> SConnectionMode c -> IO (Either StoreError ConnId)
createRcvConn db gVar cData q@RcvQueue {server} cMode =
createRcvConn db gVar cData@ConnData {connAgentVersion, enableNtfs, duplexHandshake} q@RcvQueue {server} cMode =
createConn_ gVar cData $ \connId -> do
upsertServer_ db server
DB.execute db "INSERT INTO connections (conn_id, conn_mode, smp_agent_version, duplex_handshake) VALUES (?, ?, ?, ?)" (connId, cMode, connAgentVersion cData, duplexHandshake cData)
DB.execute db "INSERT INTO connections (conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake) VALUES (?, ?, ?, ?, ?)" (connId, cMode, connAgentVersion, enableNtfs, duplexHandshake)
insertRcvQueue_ db connId q
createSndConn :: DB.Connection -> TVar ChaChaDRG -> ConnData -> SndQueue -> IO (Either StoreError ConnId)
createSndConn db gVar cData q@SndQueue {server} =
createSndConn db gVar cData@ConnData {connAgentVersion, enableNtfs, duplexHandshake} q@SndQueue {server} =
createConn_ gVar cData $ \connId -> do
upsertServer_ db server
DB.execute db "INSERT INTO connections (conn_id, conn_mode, smp_agent_version, duplex_handshake) VALUES (?, ?, ?, ?)" (connId, SCMInvitation, connAgentVersion cData, duplexHandshake cData)
DB.execute db "INSERT INTO connections (conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake) VALUES (?, ?, ?, ?, ?)" (connId, SCMInvitation, connAgentVersion, enableNtfs, duplexHandshake)
insertSndQueue_ db connId q
getRcvConn :: DB.Connection -> SMPServer -> SMP.RecipientId -> IO (Either StoreError SomeConn)
@@ -947,6 +949,10 @@ getNtfRcvQueue db SMPQueueNtf {smpServer = (SMPServer host port _), notifierId}
res (connId, Just rcvNtfDhSecret) = Right (connId, rcvNtfDhSecret)
res _ = Left SEConnNotFound
setConnectionNtfs :: DB.Connection -> ConnId -> Bool -> IO ()
setConnectionNtfs db connId enableNtfs =
DB.execute db "UPDATE connections SET enable_ntfs = ? WHERE conn_id = ?" (enableNtfs, connId)
-- * Auxiliary helpers
instance ToField QueueStatus where toField = toField . serializeQueueStatus
@@ -1102,7 +1108,7 @@ insertSndQueue_ dbConn connId SndQueue {..} = do
getConn :: DB.Connection -> ConnId -> IO (Either StoreError SomeConn)
getConn dbConn connId =
getConnData_ dbConn connId >>= \case
getConnData dbConn connId >>= \case
Nothing -> pure $ Left SEConnNotFound
Just (connData, cMode) -> do
rQ <- getRcvQueueByConnId_ dbConn connId
@@ -1114,12 +1120,12 @@ getConn dbConn connId =
(Just rcvQ, Nothing, CMContact) -> Right $ SomeConn SCContact (ContactConnection connData rcvQ)
_ -> Left SEConnNotFound
getConnData_ :: DB.Connection -> ConnId -> IO (Maybe (ConnData, ConnectionMode))
getConnData_ dbConn connId' =
getConnData :: DB.Connection -> ConnId -> IO (Maybe (ConnData, ConnectionMode))
getConnData dbConn connId' =
connData
<$> DB.query dbConn "SELECT conn_id, conn_mode, smp_agent_version, duplex_handshake FROM connections WHERE conn_id = ?;" (Only connId')
<$> DB.query dbConn "SELECT conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake FROM connections WHERE conn_id = ?;" (Only connId')
where
connData [(connId, cMode, connAgentVersion, duplexHandshake)] = Just (ConnData {connId, connAgentVersion, duplexHandshake}, cMode)
connData [(connId, cMode, connAgentVersion, enableNtfs_, duplexHandshake)] = Just (ConnData {connId, connAgentVersion, enableNtfs = fromMaybe True enableNtfs_, duplexHandshake}, cMode)
connData _ = Nothing
getRcvQueueByConnId_ :: DB.Connection -> ConnId -> IO (Maybe RcvQueue)
@@ -16,7 +16,7 @@ module Simplex.Messaging.Agent.Store.SQLite.Migrations
)
where
import Control.Monad (forM_)
import Control.Monad (forM_, when)
import Data.List (intercalate, sortBy)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.Map as M
@@ -35,6 +35,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220322_notifications
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220608_v2
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220625_v2_ntf_mode
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220811_onion_hosts
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Transport.Client (TransportHost)
@@ -48,7 +49,8 @@ schemaMigrations =
("20220322_notifications", m20220322_notifications),
("20220607_v2", m20220608_v2),
("m20220625_v2_ntf_mode", m20220625_v2_ntf_mode),
("m20220811_onion_hosts", m20220811_onion_hosts)
("m20220811_onion_hosts", m20220811_onion_hosts),
("m20220817_connection_ntfs", m20220817_connection_ntfs)
]
-- | The list of migrations in ascending order by date
@@ -63,16 +65,16 @@ get conn migrations =
<$> DB.query_ conn "SELECT name FROM migrations ORDER BY name ASC;"
run :: Connection -> [Migration] -> IO ()
run conn ms = DB.withImmediateTransaction conn . forM_ ms $
\Migration {name, up} -> insert name >> execSQL up >> updateServers name
run conn ms = forM_ ms $ \Migration {name, up} -> do
when (name == "m20220811_onion_hosts") updateServers
DB.withImmediateTransaction conn $ insert name >> execSQL up
where
insert name = DB.execute conn "INSERT INTO migrations (name, ts) VALUES (?, ?);" . (name,) =<< getCurrentTime
execSQL = SQLite3.exec $ DB.connectionHandle conn
updateServers = \case
"m20220811_onion_hosts" -> forM_ (M.assocs extraSMPServerHosts) $ \(h, h') ->
updateServers = forM_ (M.assocs extraSMPServerHosts) $ \(h, h') ->
DB.withImmediateTransaction conn $
let hs = decodeLatin1 . strEncode $ ([h, h'] :: NonEmpty TransportHost)
in DB.execute conn "UPDATE servers SET host = ? WHERE host = ?" (hs, decodeLatin1 $ strEncode h)
_ -> pure ()
initialize :: Connection -> IO ()
initialize conn =
@@ -0,0 +1,12 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220817_connection_ntfs where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
m20220817_connection_ntfs :: Query
m20220817_connection_ntfs =
[sql|
ALTER TABLE connections ADD COLUMN enable_ntfs INTEGER;
|]
@@ -20,7 +20,8 @@ CREATE TABLE connections(
last_snd_msg_hash BLOB NOT NULL DEFAULT x'',
smp_agent_version INTEGER NOT NULL DEFAULT 1
,
duplex_handshake INTEGER NULL DEFAULT 0
duplex_handshake INTEGER NULL DEFAULT 0,
enable_ntfs INTEGER
) WITHOUT ROWID;
CREATE TABLE rcv_queues(
host TEXT NOT NULL,
@@ -23,6 +23,7 @@ import Data.Type.Equality
import Data.Word (Word16)
import Database.SQLite.Simple.FromField (FromField (..))
import Database.SQLite.Simple.ToField (ToField (..))
import Simplex.Messaging.Agent.Protocol (updateSMPServerHosts)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
@@ -332,12 +333,16 @@ data SMPQueueNtf = SMPQueueNtf
instance Encoding SMPQueueNtf where
smpEncode SMPQueueNtf {smpServer, notifierId} = smpEncode (smpServer, notifierId)
smpP = do
(smpServer, notifierId) <- smpP
pure $ SMPQueueNtf {smpServer, notifierId}
smpServer <- updateSMPServerHosts <$> smpP
notifierId <- smpP
pure SMPQueueNtf {smpServer, notifierId}
instance StrEncoding SMPQueueNtf where
strEncode SMPQueueNtf {smpServer, notifierId} = strEncode smpServer <> "/" <> strEncode notifierId
strP = SMPQueueNtf <$> strP <* A.char '/' <*> strP
strP = do
smpServer <- updateSMPServerHosts <$> strP
notifierId <- A.char '/' *> strP
pure SMPQueueNtf {smpServer, notifierId}
data PushProvider = PPApnsDev | PPApnsProd | PPApnsTest
deriving (Eq, Ord, Show)
+1 -1
View File
@@ -99,7 +99,7 @@ supportedSMPServerVRange :: VersionRange
supportedSMPServerVRange = mkVersionRange 1 4
simplexMQVersion :: String
simplexMQVersion = "3.1.3"
simplexMQVersion = "3.2.0"
-- * Transport connection class
+20 -20
View File
@@ -166,8 +166,8 @@ testAgentClientContactV2toV1 = do
runAgentClientTest :: AgentClient -> AgentClient -> AgentMsgId -> IO ()
runAgentClientTest alice bob baseId = do
Right () <- runExceptT $ do
(bobId, qInfo) <- createConnection alice SCMInvitation
aliceId <- joinConnection bob qInfo "bob's connInfo"
(bobId, qInfo) <- createConnection alice True SCMInvitation
aliceId <- joinConnection bob True qInfo "bob's connInfo"
("", _, CONF confId _ "bob's connInfo") <- get alice
allowConnection alice bobId confId "alice's connInfo"
get alice ##> ("", bobId, CON)
@@ -202,10 +202,10 @@ runAgentClientTest alice bob baseId = do
runAgentClientContactTest :: AgentClient -> AgentClient -> AgentMsgId -> IO ()
runAgentClientContactTest alice bob baseId = do
Right () <- runExceptT $ do
(_, qInfo) <- createConnection alice SCMContact
aliceId <- joinConnection bob qInfo "bob's connInfo"
(_, qInfo) <- createConnection alice True SCMContact
aliceId <- joinConnection bob True qInfo "bob's connInfo"
("", _, REQ invId _ "bob's connInfo") <- get alice
bobId <- acceptContact alice invId "alice's connInfo"
bobId <- acceptContact alice True invId "alice's connInfo"
("", _, CONF confId _ "alice's connInfo") <- get bob
allowConnection bob aliceId confId "bob's connInfo"
get alice ##> ("", bobId, INFO "bob's connInfo")
@@ -250,9 +250,9 @@ testAsyncInitiatingOffline = do
alice <- getSMPAgentClient agentCfg initAgentServers
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
Right () <- runExceptT $ do
(bobId, cReq) <- createConnection alice SCMInvitation
(bobId, cReq) <- createConnection alice True SCMInvitation
disconnectAgentClient alice
aliceId <- joinConnection bob cReq "bob's connInfo"
aliceId <- joinConnection bob True cReq "bob's connInfo"
alice' <- liftIO $ getSMPAgentClient agentCfg initAgentServers
subscribeConnection alice' bobId
("", _, CONF confId _ "bob's connInfo") <- get alice'
@@ -268,8 +268,8 @@ testAsyncJoiningOfflineBeforeActivation = do
alice <- getSMPAgentClient agentCfg initAgentServers
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
Right () <- runExceptT $ do
(bobId, qInfo) <- createConnection alice SCMInvitation
aliceId <- joinConnection bob qInfo "bob's connInfo"
(bobId, qInfo) <- createConnection alice True SCMInvitation
aliceId <- joinConnection bob True qInfo "bob's connInfo"
disconnectAgentClient bob
("", _, CONF confId _ "bob's connInfo") <- get alice
allowConnection alice bobId confId "alice's connInfo"
@@ -286,9 +286,9 @@ testAsyncBothOffline = do
alice <- getSMPAgentClient agentCfg initAgentServers
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
Right () <- runExceptT $ do
(bobId, cReq) <- createConnection alice SCMInvitation
(bobId, cReq) <- createConnection alice True SCMInvitation
disconnectAgentClient alice
aliceId <- joinConnection bob cReq "bob's connInfo"
aliceId <- joinConnection bob True cReq "bob's connInfo"
disconnectAgentClient bob
alice' <- liftIO $ getSMPAgentClient agentCfg initAgentServers
subscribeConnection alice' bobId
@@ -308,9 +308,9 @@ testAsyncServerOffline t = do
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
-- create connection and shutdown the server
Right (bobId, cReq) <- withSmpServerStoreLogOn t testPort $ \_ ->
runExceptT $ createConnection alice SCMInvitation
runExceptT $ createConnection alice True SCMInvitation
-- connection fails
Left (BROKER NETWORK) <- runExceptT $ joinConnection bob cReq "bob's connInfo"
Left (BROKER NETWORK) <- runExceptT $ joinConnection bob True cReq "bob's connInfo"
("", "", DOWN srv conns) <- get alice
srv `shouldBe` testSMPServer
conns `shouldBe` [bobId]
@@ -320,7 +320,7 @@ testAsyncServerOffline t = do
liftIO $ do
srv1 `shouldBe` testSMPServer
conns1 `shouldBe` [bobId]
aliceId <- joinConnection bob cReq "bob's connInfo"
aliceId <- joinConnection bob True cReq "bob's connInfo"
("", _, CONF confId _ "bob's connInfo") <- get alice
allowConnection alice bobId confId "alice's connInfo"
get alice ##> ("", bobId, CON)
@@ -335,9 +335,9 @@ testAsyncHelloTimeout = do
alice <- getSMPAgentClient agentCfgV1 initAgentServers
bob <- getSMPAgentClient agentCfg {dbFile = testDB2, helloTimeout = 1} initAgentServers
Right () <- runExceptT $ do
(_, cReq) <- createConnection alice SCMInvitation
(_, cReq) <- createConnection alice True SCMInvitation
disconnectAgentClient alice
aliceId <- joinConnection bob cReq "bob's connInfo"
aliceId <- joinConnection bob True cReq "bob's connInfo"
get bob ##> ("", aliceId, ERR $ CONN NOT_ACCEPTED)
pure ()
@@ -392,8 +392,8 @@ testDuplicateMessage t = do
makeConnection :: AgentClient -> AgentClient -> ExceptT AgentErrorType IO (ConnId, ConnId)
makeConnection alice bob = do
(bobId, qInfo) <- createConnection alice SCMInvitation
aliceId <- joinConnection bob qInfo "bob's connInfo"
(bobId, qInfo) <- createConnection alice True SCMInvitation
aliceId <- joinConnection bob True qInfo "bob's connInfo"
("", _, CONF confId _ "bob's connInfo") <- get alice
allowConnection alice bobId confId "alice's connInfo"
get alice ##> ("", bobId, CON)
@@ -407,7 +407,7 @@ testInactiveClientDisconnected t = do
withSmpServerConfigOn t cfg' testPort $ \_ -> do
alice <- getSMPAgentClient agentCfg initAgentServers
Right () <- runExceptT $ do
(connId, _cReq) <- createConnection alice SCMInvitation
(connId, _cReq) <- createConnection alice True SCMInvitation
get alice ##> ("", "", DOWN testSMPServer [connId])
pure ()
@@ -418,7 +418,7 @@ testActiveClientNotDisconnected t = do
alice <- getSMPAgentClient agentCfg initAgentServers
ts <- getSystemTime
Right () <- runExceptT $ do
(connId, _cReq) <- createConnection alice SCMInvitation
(connId, _cReq) <- createConnection alice True SCMInvitation
keepSubscribing alice connId ts
pure ()
where
+13 -11
View File
@@ -210,8 +210,8 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
Right (bobId, aliceId, nonce, message) <- runExceptT $ do
-- establish connection
(bobId, qInfo) <- createConnection alice SCMInvitation
aliceId <- joinConnection bob qInfo "bob's connInfo"
(bobId, qInfo) <- createConnection alice True SCMInvitation
aliceId <- joinConnection bob True qInfo "bob's connInfo"
("", _, CONF confId _ "bob's connInfo") <- get alice
allowConnection alice bobId confId "alice's connInfo"
get bob ##> ("", aliceId, INFO "alice's connInfo")
@@ -249,7 +249,8 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 1
-- delete notification subscription
deleteNtfSub alice bobId
toggleConnectionNtfs alice bobId False
liftIO $ threadDelay 250000
-- send message
2 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello again"
get bob ##> ("", aliceId, SENT $ baseId + 2)
@@ -271,9 +272,10 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do
_ <- registerTestToken bob "bcde" NMInstant apnsQ
-- establish connection
liftIO $ threadDelay 50000
(bobId, qInfo) <- createConnection alice SCMInvitation
liftIO $ threadDelay 500000
aliceId <- joinConnection bob qInfo "bob's connInfo"
(bobId, qInfo) <- createConnection alice True SCMInvitation
liftIO $ threadDelay 1000000
aliceId <- joinConnection bob True qInfo "bob's connInfo"
liftIO $ threadDelay 750000
liftIO $ print 0
void $ messageNotification apnsQ
("", _, CONF confId _ "bob's connInfo") <- get alice
@@ -328,8 +330,8 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
Right () <- runExceptT $ do
-- establish connection
(bobId, qInfo) <- createConnection alice SCMInvitation
aliceId <- joinConnection bob qInfo "bob's connInfo"
(bobId, qInfo) <- createConnection alice True SCMInvitation
aliceId <- joinConnection bob True qInfo "bob's connInfo"
("", _, CONF confId _ "bob's connInfo") <- get alice
allowConnection alice bobId confId "alice's connInfo"
get bob ##> ("", aliceId, INFO "alice's connInfo")
@@ -347,7 +349,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do
-- set mode to NMPeriodic
NTActive <- registerNtfToken alice tkn NMPeriodic
-- send message, no notification
liftIO $ threadDelay 500000
liftIO $ threadDelay 750000
2 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello again"
get bob ##> ("", aliceId, SENT $ baseId + 2)
noNotification apnsQ
@@ -393,8 +395,8 @@ testChangeToken APNSMockServer {apnsQ} = do
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
Right (aliceId, bobId) <- runExceptT $ do
-- establish connection
(bobId, qInfo) <- createConnection alice SCMInvitation
aliceId <- joinConnection bob qInfo "bob's connInfo"
(bobId, qInfo) <- createConnection alice True SCMInvitation
aliceId <- joinConnection bob True qInfo "bob's connInfo"
("", _, CONF confId _ "bob's connInfo") <- get alice
allowConnection alice bobId confId "alice's connInfo"
get bob ##> ("", aliceId, INFO "alice's connInfo")
+1 -1
View File
@@ -139,7 +139,7 @@ testForeignKeysEnabled =
`shouldThrow` (\e -> DB.sqlError e == DB.ErrorConstraint)
cData1 :: ConnData
cData1 = ConnData {connId = "conn1", connAgentVersion = 1, duplexHandshake = Nothing}
cData1 = ConnData {connId = "conn1", connAgentVersion = 1, enableNtfs = True, duplexHandshake = Nothing}
testPrivateSignKey :: C.APrivateSignKey
testPrivateSignKey = C.APrivateSignKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe"