Files
simplexmq/src/Simplex/Messaging/Agent/Env/SQLite.hs
Evgeny Poberezkin eb5c1c78cb connection queue redundancy and rotation (#521)
* rfc: queue rotation

* update rfc

* messages for queue rotation

* allow multiple subscribed queues per connection in Agent/Client.hs

* refactor

* fix module name

* allow multiple queues in duplex connection type

* update commands

* add indices

* addConnectionRcvQueue

* switch connection to another queue (WIP)

* update schema/protocol

* switching queue works, but sending messages after the switch fails

* messages are delivered after rotation

* use connection-scoped queue ID

* rename queue records fields

* refactor using SMPQueue class/instances

* simplify queries

* QKEY: check queue is not secured, refactor

* update rfc

* mark queue as primary in QUSE

* queue rotation errors

* fix async ack

* fix async ACK to send OK

* correction

Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com>

* use SWCH command

* rename

* take into account only active queue subscription when determining connection result if at least one queue is active

* remove comment

* only enable notifications for connections with enableNtfs = True

* async test (WIP)

* async queue rotation test

* simplify combining results

* test with 2 servers

* fix unused subscribeConnection

* switch to cabal build

* increase build timeout

* increase delay in async test

* skip queue rotation tests

* build matrix

* step name

* use ubuntu-18.04 in build matrix

* enable rotation tests

Co-authored-by: JRoberts <8711996+jr-simplex@users.noreply.github.com>
2022-10-29 18:57:01 +01:00

190 lines
6.3 KiB
Haskell

{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
module Simplex.Messaging.Agent.Env.SQLite
( AgentMonad,
AgentConfig (..),
AgentDatabase (..),
databaseFile,
InitialAgentServers (..),
NetworkConfig (..),
defaultAgentConfig,
defaultReconnectInterval,
Env (..),
newSMPAgentEnv,
createAgentStore,
NtfSupervisor (..),
NtfSupervisorCommand (..),
)
where
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Crypto.Random
import Data.List.NonEmpty (NonEmpty)
import Data.Time.Clock (NominalDiffTime, nominalDay)
import Data.Word (Word16)
import Network.Socket
import Numeric.Natural
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client
import Simplex.Messaging.Client.Agent ()
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (supportedE2EEncryptVRange)
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Protocol (NtfServer, supportedSMPClientVRange)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (TLS, Transport (..))
import Simplex.Messaging.Transport.Client (defaultSMPPort)
import Simplex.Messaging.Version
import System.Random (StdGen, newStdGen)
import UnliftIO (Async)
import UnliftIO.STM
-- | Agent monad with MonadReader Env and MonadError AgentErrorType
type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m)
data InitialAgentServers = InitialAgentServers
{ smp :: NonEmpty SMPServer,
ntf :: [NtfServer],
netCfg :: NetworkConfig
}
data AgentDatabase
= AgentDB SQLiteStore
| AgentDBFile {dbFile :: FilePath, dbKey :: String}
databaseFile :: AgentDatabase -> FilePath
databaseFile = \case
AgentDB SQLiteStore {dbFilePath} -> dbFilePath
AgentDBFile {dbFile} -> dbFile
data AgentConfig = AgentConfig
{ tcpPort :: ServiceName,
cmdSignAlg :: C.SignAlg,
connIdBytes :: Int,
tbqSize :: Natural,
database :: AgentDatabase,
yesToMigrations :: Bool,
smpCfg :: ProtocolClientConfig,
ntfCfg :: ProtocolClientConfig,
reconnectInterval :: RetryInterval,
messageRetryInterval :: RetryInterval,
messageTimeout :: NominalDiffTime,
helloTimeout :: NominalDiffTime,
ntfCron :: Word16,
ntfWorkerDelay :: Int,
ntfSMPWorkerDelay :: Int,
ntfSubCheckInterval :: NominalDiffTime,
ntfMaxMessages :: Int,
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
certificateFile :: FilePath,
e2eEncryptVRange :: VersionRange,
smpAgentVRange :: VersionRange,
smpClientVRange :: VersionRange,
initialClientId :: Int
}
defaultReconnectInterval :: RetryInterval
defaultReconnectInterval =
RetryInterval
{ initialInterval = 2_000000,
increaseAfter = 10_000000,
maxInterval = 180_000000
}
defaultMessageRetryInterval :: RetryInterval
defaultMessageRetryInterval =
RetryInterval
{ initialInterval = 1_000000,
increaseAfter = 10_000000,
maxInterval = 60_000000
}
defaultAgentConfig :: AgentConfig
defaultAgentConfig =
AgentConfig
{ tcpPort = "5224",
cmdSignAlg = C.SignAlg C.SEd448,
connIdBytes = 12,
tbqSize = 64,
database = AgentDBFile {dbFile = "smp-agent.db", dbKey = ""},
yesToMigrations = False,
smpCfg = defaultClientConfig {defaultTransport = (show defaultSMPPort, transport @TLS)},
ntfCfg = defaultClientConfig {defaultTransport = ("443", transport @TLS)},
reconnectInterval = defaultReconnectInterval,
messageRetryInterval = defaultMessageRetryInterval,
messageTimeout = 2 * nominalDay,
helloTimeout = 2 * nominalDay,
ntfCron = 20, -- minutes
ntfWorkerDelay = 100000, -- microseconds
ntfSMPWorkerDelay = 500000, -- microseconds
ntfSubCheckInterval = nominalDay,
ntfMaxMessages = 4,
-- CA certificate private key is not needed for initialization
-- ! we do not generate these
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",
privateKeyFile = "/etc/opt/simplex-agent/agent.key",
certificateFile = "/etc/opt/simplex-agent/agent.crt",
e2eEncryptVRange = supportedE2EEncryptVRange,
smpAgentVRange = supportedSMPAgentVRange,
smpClientVRange = supportedSMPClientVRange,
initialClientId = 0
}
data Env = Env
{ config :: AgentConfig,
store :: SQLiteStore,
idsDrg :: TVar ChaChaDRG,
clientCounter :: TVar Int,
randomServer :: TVar StdGen,
ntfSupervisor :: NtfSupervisor
}
newSMPAgentEnv :: (MonadUnliftIO m, MonadRandom m) => AgentConfig -> m Env
newSMPAgentEnv config@AgentConfig {database, yesToMigrations, initialClientId} = do
idsDrg <- newTVarIO =<< drgNew
store <- case database of
AgentDB st -> pure st
AgentDBFile {dbFile, dbKey} -> liftIO $ createAgentStore dbFile dbKey yesToMigrations
clientCounter <- newTVarIO initialClientId
randomServer <- newTVarIO =<< liftIO newStdGen
ntfSupervisor <- atomically . newNtfSubSupervisor $ tbqSize config
return Env {config, store, idsDrg, clientCounter, randomServer, ntfSupervisor}
createAgentStore :: FilePath -> String -> Bool -> IO SQLiteStore
createAgentStore dbFilePath dbKey = createSQLiteStore dbFilePath dbKey Migrations.app
data NtfSupervisor = NtfSupervisor
{ ntfTkn :: TVar (Maybe NtfToken),
ntfSubQ :: TBQueue (ConnId, NtfSupervisorCommand),
ntfWorkers :: TMap NtfServer (TMVar (), Async ()),
ntfSMPWorkers :: TMap SMPServer (TMVar (), Async ())
}
data NtfSupervisorCommand = NSCCreate | NSCDelete | NSCSmpDelete | NSCNtfWorker NtfServer | NSCNtfSMPWorker SMPServer
deriving (Show)
newNtfSubSupervisor :: Natural -> STM NtfSupervisor
newNtfSubSupervisor qSize = do
ntfTkn <- newTVar Nothing
ntfSubQ <- newTBQueue qSize
ntfWorkers <- TM.empty
ntfSMPWorkers <- TM.empty
pure NtfSupervisor {ntfTkn, ntfSubQ, ntfWorkers, ntfSMPWorkers}