mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-29 14:30:22 +00:00
* protocol: refactor types and encoding * clean * smp server: batch commands (#1560) * smp server: batch commands verification into one DB transaction * ghc 8.10.7 * flatten transmission tuples * diff * only use batch logic if there is more than one transmission * func * reset NTF service when adding notifier * version * Revert "smp server: use separate database pool for reading queues and creating service records (#1561)" This reverts commit3df2425162. * version * Revert "version" This reverts commitd80a6b74c5.
307 lines
12 KiB
Haskell
307 lines
12 KiB
Haskell
{-# LANGUAGE DataKinds #-}
|
|
{-# LANGUAGE DeriveAnyClass #-}
|
|
{-# LANGUAGE DeriveGeneric #-}
|
|
{-# LANGUAGE DuplicateRecordFields #-}
|
|
{-# LANGUAGE FlexibleContexts #-}
|
|
{-# LANGUAGE LambdaCase #-}
|
|
{-# LANGUAGE NamedFieldPuns #-}
|
|
{-# LANGUAGE OverloadedLists #-}
|
|
{-# LANGUAGE OverloadedStrings #-}
|
|
{-# LANGUAGE RankNTypes #-}
|
|
{-# LANGUAGE ScopedTypeVariables #-}
|
|
{-# LANGUAGE StandaloneDeriving #-}
|
|
{-# LANGUAGE TypeApplications #-}
|
|
{-# OPTIONS_GHC -fno-warn-orphans #-}
|
|
|
|
module NtfClient where
|
|
|
|
import Control.Concurrent.STM (retry)
|
|
import Control.Monad
|
|
import Control.Monad.Except (runExceptT)
|
|
import Control.Monad.IO.Class
|
|
import Data.Aeson (FromJSON (..), ToJSON (..), (.:))
|
|
import qualified Data.Aeson as J
|
|
import qualified Data.Aeson.Types as JT
|
|
import Data.ByteString.Builder (lazyByteString)
|
|
import Data.ByteString.Char8 (ByteString)
|
|
import qualified Data.ByteString.Char8 as B
|
|
import Data.List.NonEmpty (NonEmpty)
|
|
import qualified Data.Map.Strict as M
|
|
import Data.Text (Text)
|
|
import Database.PostgreSQL.Simple (ConnectInfo (..), defaultConnectInfo)
|
|
import GHC.Generics (Generic)
|
|
import Network.HTTP.Types (Status)
|
|
import qualified Network.HTTP.Types as N
|
|
import qualified Network.HTTP2.Server as H
|
|
import Network.Socket
|
|
import SMPClient (defaultStartOptions, ntfTestPort, ntfTestServerCredentials, prevRange, serverBracket)
|
|
import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..))
|
|
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..))
|
|
import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultNetworkConfig)
|
|
import Simplex.Messaging.Client.Agent (SMPClientAgentConfig (..), defaultSMPClientAgentConfig)
|
|
import qualified Simplex.Messaging.Crypto as C
|
|
import Simplex.Messaging.Encoding
|
|
import Simplex.Messaging.Notifications.Protocol (DeviceToken (..), NtfResponse)
|
|
import Simplex.Messaging.Notifications.Server (runNtfServerBlocking)
|
|
import Simplex.Messaging.Notifications.Server.Env
|
|
import Simplex.Messaging.Notifications.Server.Push.APNS
|
|
import Simplex.Messaging.Notifications.Server.Push.APNS.Internal
|
|
import Simplex.Messaging.Notifications.Transport
|
|
import Simplex.Messaging.Protocol
|
|
import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..))
|
|
import qualified Simplex.Messaging.TMap as TM
|
|
import Simplex.Messaging.Transport
|
|
import Simplex.Messaging.Transport.Client
|
|
import Simplex.Messaging.Transport.HTTP2 (HTTP2Body (..), http2TLSParams)
|
|
import Simplex.Messaging.Transport.HTTP2.Server
|
|
import Simplex.Messaging.Transport.Server
|
|
import Test.Hspec hiding (fit, it)
|
|
import UnliftIO.Async
|
|
import UnliftIO.Concurrent
|
|
import qualified UnliftIO.Exception as E
|
|
import UnliftIO.STM
|
|
|
|
testHost :: NonEmpty TransportHost
|
|
testHost = "localhost"
|
|
|
|
apnsTestPort :: ServiceName
|
|
apnsTestPort = "6010"
|
|
|
|
testKeyHash :: C.KeyHash
|
|
testKeyHash = "LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI="
|
|
|
|
ntfTestStoreLogFile :: FilePath
|
|
ntfTestStoreLogFile = "tests/tmp/ntf-server-store.log"
|
|
|
|
ntfTestStoreLogFile2 :: FilePath
|
|
ntfTestStoreLogFile2 = "tests/tmp/ntf-server-store.log.2"
|
|
|
|
ntfTestStoreLastNtfsFile :: FilePath
|
|
ntfTestStoreLastNtfsFile = "tests/tmp/ntf-server-last-notifications.log"
|
|
|
|
ntfTestPrometheusMetricsFile :: FilePath
|
|
ntfTestPrometheusMetricsFile = "tests/tmp/ntf-server-metrics.txt"
|
|
|
|
ntfTestStoreDBOpts :: DBOpts
|
|
ntfTestStoreDBOpts =
|
|
DBOpts
|
|
{ connstr = ntfTestServerDBConnstr,
|
|
schema = "ntf_server",
|
|
poolSize = 3,
|
|
createSchema = True
|
|
}
|
|
|
|
ntfTestStoreDBOpts2 :: DBOpts
|
|
ntfTestStoreDBOpts2 = ntfTestStoreDBOpts {schema = "smp_server2"}
|
|
|
|
ntfTestServerDBConnstr :: ByteString
|
|
ntfTestServerDBConnstr = "postgresql://ntf_test_server_user@/ntf_test_server_db"
|
|
|
|
ntfTestServerDBConnectInfo :: ConnectInfo
|
|
ntfTestServerDBConnectInfo =
|
|
defaultConnectInfo
|
|
{ connectUser = "ntf_test_server_user",
|
|
connectDatabase = "ntf_test_server_db"
|
|
}
|
|
|
|
ntfTestDBCfg :: PostgresStoreCfg
|
|
ntfTestDBCfg =
|
|
PostgresStoreCfg
|
|
{ dbOpts = ntfTestStoreDBOpts,
|
|
dbStoreLogPath = Just ntfTestStoreLogFile,
|
|
confirmMigrations = MCYesUp,
|
|
deletedTTL = 86400
|
|
}
|
|
|
|
ntfTestDBCfg2 :: PostgresStoreCfg
|
|
ntfTestDBCfg2 = ntfTestDBCfg {dbOpts = ntfTestStoreDBOpts2, dbStoreLogPath = Just ntfTestStoreLogFile2}
|
|
|
|
testNtfClient :: Transport c => (THandleNTF c 'TClient -> IO a) -> IO a
|
|
testNtfClient client = do
|
|
Right host <- pure $ chooseTransportHost defaultNetworkConfig testHost
|
|
runTransportClient defaultTransportClientConfig Nothing host ntfTestPort (Just testKeyHash) $ \h ->
|
|
runExceptT (ntfClientHandshake h testKeyHash supportedClientNTFVRange False Nothing) >>= \case
|
|
Right th -> client th
|
|
Left e -> error $ show e
|
|
|
|
ntfServerCfg :: NtfServerConfig
|
|
ntfServerCfg =
|
|
NtfServerConfig
|
|
{ transports = [],
|
|
controlPort = Nothing,
|
|
controlPortUserAuth = Nothing,
|
|
controlPortAdminAuth = Nothing,
|
|
subIdBytes = 24,
|
|
regCodeBytes = 32,
|
|
clientQSize = 2,
|
|
pushQSize = 2,
|
|
smpAgentCfg = defaultSMPClientAgentConfig {persistErrorInterval = 0},
|
|
apnsConfig =
|
|
defaultAPNSPushClientConfig
|
|
{ apnsPort = apnsTestPort,
|
|
caStoreFile = "tests/fixtures/ca.crt"
|
|
},
|
|
subsBatchSize = 900,
|
|
inactiveClientExpiration = Just defaultInactiveClientExpiration,
|
|
dbStoreConfig = ntfTestDBCfg,
|
|
ntfCredentials = ntfTestServerCredentials,
|
|
useServiceCreds = True,
|
|
periodicNtfsInterval = 1,
|
|
-- stats config
|
|
logStatsInterval = Nothing,
|
|
logStatsStartTime = 0,
|
|
serverStatsLogFile = "tests/ntf-server-stats.daily.log",
|
|
serverStatsBackupFile = Nothing,
|
|
prometheusInterval = Nothing,
|
|
prometheusMetricsFile = ntfTestPrometheusMetricsFile,
|
|
ntfServerVRange = supportedServerNTFVRange,
|
|
transportConfig = mkTransportServerConfig True (Just alpnSupportedNTFHandshakes) False,
|
|
startOptions = defaultStartOptions
|
|
}
|
|
|
|
ntfServerCfgVPrev :: NtfServerConfig
|
|
ntfServerCfgVPrev =
|
|
ntfServerCfg
|
|
{ ntfServerVRange = prevRange $ ntfServerVRange ntfServerCfg,
|
|
smpAgentCfg = smpAgentCfg' {smpCfg = smpCfg' {serverVRange = prevRange serverVRange'}}
|
|
}
|
|
where
|
|
smpAgentCfg' = smpAgentCfg ntfServerCfg
|
|
smpCfg' = smpCfg smpAgentCfg'
|
|
serverVRange' = serverVRange smpCfg'
|
|
|
|
withNtfServerThreadOn :: HasCallStack => ASrvTransport -> ServiceName -> PostgresStoreCfg -> (HasCallStack => ThreadId -> IO a) -> IO a
|
|
withNtfServerThreadOn t port' dbStoreConfig =
|
|
withNtfServerCfg ntfServerCfg {transports = [(port', t, False)], dbStoreConfig}
|
|
|
|
withNtfServerCfg :: HasCallStack => NtfServerConfig -> (ThreadId -> IO a) -> IO a
|
|
withNtfServerCfg cfg@NtfServerConfig {transports} =
|
|
case transports of
|
|
[] -> error "no transports configured"
|
|
_ ->
|
|
serverBracket
|
|
(\started -> runNtfServerBlocking started cfg)
|
|
(pure ())
|
|
|
|
withNtfServerOn :: HasCallStack => ASrvTransport -> ServiceName -> PostgresStoreCfg -> (HasCallStack => IO a) -> IO a
|
|
withNtfServerOn t port' dbStoreConfig = withNtfServerThreadOn t port' dbStoreConfig . const
|
|
|
|
withNtfServer :: HasCallStack => ASrvTransport -> (HasCallStack => IO a) -> IO a
|
|
withNtfServer t = withNtfServerOn t ntfTestPort ntfTestDBCfg
|
|
|
|
runNtfTest :: forall c a. Transport c => (THandleNTF c 'TClient -> IO a) -> IO a
|
|
runNtfTest test = withNtfServer (transport @c) $ testNtfClient test
|
|
|
|
ntfServerTest ::
|
|
forall c smp.
|
|
(Transport c, Encoding smp) =>
|
|
TProxy c 'TServer ->
|
|
(Maybe TAuthorizations, ByteString, ByteString, smp) ->
|
|
IO (Maybe TAuthorizations, ByteString, ByteString, NtfResponse)
|
|
ntfServerTest _ t = runNtfTest $ \h -> tPut' h t >> tGet' h
|
|
where
|
|
tPut' :: THandleNTF c 'TClient -> (Maybe TAuthorizations, ByteString, ByteString, smp) -> IO ()
|
|
tPut' h@THandle {params = THandleParams {sessionId, implySessId}} (sig, corrId, queueId, smp) = do
|
|
let t' = if implySessId then smpEncode (corrId, queueId, smp) else smpEncode (sessionId, corrId, queueId, smp)
|
|
[Right ()] <- tPut h [Right (sig, t')]
|
|
pure ()
|
|
tGet' h = do
|
|
[(CorrId corrId, EntityId qId, Right cmd)] <- tGetClient h
|
|
pure (Nothing, corrId, qId, cmd)
|
|
|
|
ntfTest :: Transport c => TProxy c 'TServer -> (THandleNTF c 'TClient -> IO ()) -> Expectation
|
|
ntfTest _ test' = runNtfTest test' `shouldReturn` ()
|
|
|
|
data APNSMockRequest = APNSMockRequest
|
|
{ notification :: APNSNotification
|
|
}
|
|
|
|
data APNSMockResponse = APNSRespOk | APNSRespError Status Text
|
|
|
|
data APNSMockServer = APNSMockServer
|
|
{ action :: Async (),
|
|
notifications :: TM.TMap ByteString (TBQueue APNSMockRequest),
|
|
http2Server :: HTTP2Server
|
|
}
|
|
|
|
apnsMockServerConfig :: HTTP2ServerConfig
|
|
apnsMockServerConfig =
|
|
HTTP2ServerConfig
|
|
{ qSize = 2,
|
|
http2Port = apnsTestPort,
|
|
bufferSize = 16384,
|
|
bodyHeadSize = 16384,
|
|
serverSupported = http2TLSParams,
|
|
https2Credentials =
|
|
ServerCredentials
|
|
{ caCertificateFile = Just "tests/fixtures/ca.crt",
|
|
privateKeyFile = "tests/fixtures/server.key",
|
|
certificateFile = "tests/fixtures/server.crt"
|
|
},
|
|
transportConfig = mkTransportServerConfig True Nothing False
|
|
}
|
|
|
|
withAPNSMockServer :: (APNSMockServer -> IO a) -> IO a
|
|
withAPNSMockServer = E.bracket (getAPNSMockServer apnsMockServerConfig) closeAPNSMockServer
|
|
|
|
deriving instance Generic APNSAlertBody
|
|
|
|
instance FromJSON APNSAlertBody where
|
|
parseJSON (J.Object v) = do
|
|
title <- v .: "title"
|
|
subtitle <- v .: "subtitle"
|
|
body <- v .: "body"
|
|
pure APNSAlertObject {title, subtitle, body}
|
|
parseJSON (J.String v) = pure $ APNSAlertText v
|
|
parseJSON invalid = JT.prependFailure "parsing Coord failed, " (JT.typeMismatch "Object" invalid)
|
|
|
|
deriving instance Generic APNSNotificationBody
|
|
|
|
instance FromJSON APNSNotificationBody where parseJSON = J.genericParseJSON apnsJSONOptions {J.rejectUnknownFields = True}
|
|
|
|
deriving instance Generic APNSNotification
|
|
|
|
deriving instance FromJSON APNSNotification
|
|
|
|
deriving instance Generic APNSErrorResponse
|
|
|
|
deriving instance ToJSON APNSErrorResponse
|
|
|
|
getAPNSMockServer :: HTTP2ServerConfig -> IO APNSMockServer
|
|
getAPNSMockServer config@HTTP2ServerConfig {qSize} = do
|
|
http2Server <- getHTTP2Server config
|
|
notifications <- TM.emptyIO
|
|
action <- async $ runAPNSMockServer notifications http2Server
|
|
pure APNSMockServer {action, notifications, http2Server}
|
|
where
|
|
runAPNSMockServer notifications HTTP2Server {reqQ} = forever $ do
|
|
HTTP2Request {request, reqBody = HTTP2Body {bodyHead}, sendResponse} <- atomically $ readTBQueue reqQ
|
|
let sendApnsResponse = \case
|
|
APNSRespOk -> sendResponse $ H.responseNoBody N.ok200 []
|
|
APNSRespError status reason ->
|
|
sendResponse . H.responseBuilder status [] . lazyByteString $ J.encode APNSErrorResponse {reason}
|
|
case J.decodeStrict' bodyHead of
|
|
Just notification -> do
|
|
Just token <- pure $ B.stripPrefix "/3/device/" =<< H.requestPath request
|
|
q <- atomically $ TM.lookup token notifications >>= maybe (newTokenQueue token) pure
|
|
atomically $ writeTBQueue q APNSMockRequest {notification}
|
|
sendApnsResponse APNSRespOk
|
|
where
|
|
newTokenQueue token = newTBQueue qSize >>= \q -> TM.insert token q notifications >> pure q
|
|
_ -> do
|
|
putStrLn $ "runAPNSMockServer J.decodeStrict' error, reqBody: " <> show bodyHead
|
|
sendApnsResponse $ APNSRespError N.badRequest400 "bad_request_body"
|
|
|
|
getMockNotification :: MonadIO m => APNSMockServer -> DeviceToken -> m APNSMockRequest
|
|
getMockNotification APNSMockServer {notifications} (DeviceToken _ token) = do
|
|
atomically $ TM.lookup token notifications >>= maybe retry readTBQueue
|
|
|
|
getAnyMockNotification :: MonadIO m => APNSMockServer -> m APNSMockRequest
|
|
getAnyMockNotification APNSMockServer {notifications} = do
|
|
atomically $ readTVar notifications >>= mapM readTBQueue . M.elems >>= \case [] -> retry; ntf : _ -> pure ntf
|
|
|
|
closeAPNSMockServer :: APNSMockServer -> IO ()
|
|
closeAPNSMockServer APNSMockServer {action, http2Server} = do
|
|
closeHTTP2Server http2Server
|
|
uninterruptibleCancel action
|