Files
simplexmq/tests/NtfClient.hs

372 lines
15 KiB
Haskell

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedLists #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
module NtfClient where
import Control.Concurrent.STM (retry)
import Control.Exception (throwIO)
import Control.Monad
import Control.Monad.Except (runExceptT)
import Control.Monad.IO.Class
import Data.Aeson (FromJSON (..), ToJSON (..), (.:))
import qualified Data.Aeson as J
import qualified Data.Aeson.Types as JT
import Data.ByteString.Builder (lazyByteString)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.List.NonEmpty (NonEmpty)
import qualified Data.Map.Strict as M
import Data.Text (Text)
import Database.PostgreSQL.Simple (ConnectInfo (..), defaultConnectInfo)
import GHC.Generics (Generic)
import Network.HTTP.Types (Status)
import qualified Network.HTTP.Types as N
import qualified Network.HTTP2.Server as H
import Network.Socket
import SMPClient (defaultStartOptions, ntfTestPort, ntfTestServerCredentials, prevRange, serverBracket)
import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..))
import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..))
import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultNetworkConfig)
import Simplex.Messaging.Client.Agent (SMPClientAgentConfig (..), defaultSMPClientAgentConfig)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Notifications.Protocol (DeviceToken(..), NtfResponse, WPTokenParams(..))
import Simplex.Messaging.Notifications.Server (runNtfServerBlocking)
import Simplex.Messaging.Notifications.Server.Env
import Simplex.Messaging.Notifications.Server.Main (getVapidKey)
import Simplex.Messaging.Notifications.Server.Push.APNS
import Simplex.Messaging.Notifications.Server.Push.APNS.Internal
import Simplex.Messaging.Notifications.Server.Push.WebPush (WebPushConfig (..))
import Simplex.Messaging.Notifications.Transport
import Simplex.Messaging.Protocol
import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..))
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
import Simplex.Messaging.Transport.Client
import Simplex.Messaging.Transport.HTTP2 (HTTP2Body (..), http2TLSParams)
import Simplex.Messaging.Transport.HTTP2.Server
import Simplex.Messaging.Transport.Server
import Test.Hspec hiding (fit, it)
import UnliftIO.Async
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E
import UnliftIO.STM
import Data.Aeson.Types ((.=))
import qualified Network.HPACK as H
import qualified Network.HPACK.Token as H
import Data.Maybe (fromMaybe)
testHost :: NonEmpty TransportHost
testHost = "localhost"
apnsTestPort :: ServiceName
apnsTestPort = "6010"
wpTestPort :: ServiceName
wpTestPort = "6011"
testKeyHash :: C.KeyHash
testKeyHash = "LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI="
ntfTestStoreLogFile :: FilePath
ntfTestStoreLogFile = "tests/tmp/ntf-server-store.log"
ntfTestStoreLogFile2 :: FilePath
ntfTestStoreLogFile2 = "tests/tmp/ntf-server-store.log.2"
ntfTestStoreLastNtfsFile :: FilePath
ntfTestStoreLastNtfsFile = "tests/tmp/ntf-server-last-notifications.log"
ntfTestPrometheusMetricsFile :: FilePath
ntfTestPrometheusMetricsFile = "tests/tmp/ntf-server-metrics.txt"
ntfTestStoreDBOpts :: DBOpts
ntfTestStoreDBOpts =
DBOpts
{ connstr = ntfTestServerDBConnstr,
schema = "ntf_server",
poolSize = 3,
createSchema = True
}
ntfTestStoreDBOpts2 :: DBOpts
ntfTestStoreDBOpts2 = ntfTestStoreDBOpts {schema = "smp_server2"}
ntfTestServerDBConnstr :: ByteString
ntfTestServerDBConnstr = "postgresql://ntf_test_server_user@/ntf_test_server_db"
ntfTestServerDBConnectInfo :: ConnectInfo
ntfTestServerDBConnectInfo =
defaultConnectInfo
{ connectUser = "ntf_test_server_user",
connectDatabase = "ntf_test_server_db"
}
ntfTestDBCfg :: PostgresStoreCfg
ntfTestDBCfg =
PostgresStoreCfg
{ dbOpts = ntfTestStoreDBOpts,
dbStoreLogPath = Just ntfTestStoreLogFile,
confirmMigrations = MCYesUp,
deletedTTL = 86400
}
ntfTestDBCfg2 :: PostgresStoreCfg
ntfTestDBCfg2 = ntfTestDBCfg {dbOpts = ntfTestStoreDBOpts2, dbStoreLogPath = Just ntfTestStoreLogFile2}
testNtfClient :: Transport c => (THandleNTF c 'TClient -> IO a) -> IO a
testNtfClient client = do
Right host <- pure $ chooseTransportHost defaultNetworkConfig testHost
runTransportClient defaultTransportClientConfig Nothing host ntfTestPort (Just testKeyHash) $ \h ->
runExceptT (ntfClientHandshake h testKeyHash supportedClientNTFVRange False Nothing) >>= \case
Right th -> client th
Left e -> error $ show e
ntfServerCfg :: IO NtfServerConfig
ntfServerCfg = do
vapidKey <- getVapidKey "tests/fixtures/vapid.privkey"
pure
NtfServerConfig
{ transports = [],
controlPort = Nothing,
controlPortUserAuth = Nothing,
controlPortAdminAuth = Nothing,
subIdBytes = 24,
regCodeBytes = 32,
clientQSize = 2,
pushQSize = 2,
smpAgentCfg = defaultSMPClientAgentConfig {persistErrorInterval = 0},
apnsConfig =
defaultAPNSPushClientConfig
{ apnsPort = apnsTestPort,
caStoreFile = "tests/fixtures/ca.crt"
},
wpConfig = WebPushConfig {vapidKey, paddedNtfLength = 3072},
subsBatchSize = 900,
inactiveClientExpiration = Just defaultInactiveClientExpiration,
dbStoreConfig = ntfTestDBCfg,
ntfCredentials = ntfTestServerCredentials,
useServiceCreds = True,
periodicNtfsInterval = 1,
-- stats config
logStatsInterval = Nothing,
logStatsStartTime = 0,
serverStatsLogFile = "tests/ntf-server-stats.daily.log",
serverStatsBackupFile = Nothing,
prometheusInterval = Nothing,
prometheusMetricsFile = ntfTestPrometheusMetricsFile,
ntfServerVRange = supportedServerNTFVRange,
transportConfig = mkTransportServerConfig True (Just alpnSupportedNTFHandshakes) False,
startOptions = defaultStartOptions
}
ntfServerCfgVPrev :: IO NtfServerConfig
ntfServerCfgVPrev =
ntfServerCfg
>>= \cfg -> pure $ ntfServerCfgVPrev' cfg
ntfServerCfgVPrev' :: NtfServerConfig -> NtfServerConfig
ntfServerCfgVPrev' cfg =
cfg
{ ntfServerVRange = prevRange $ ntfServerVRange cfg,
smpAgentCfg = smpAgentCfg' {smpCfg = smpCfg' {serverVRange = prevRange serverVRange'}}
}
where
smpAgentCfg' = smpAgentCfg cfg
smpCfg' = smpCfg smpAgentCfg'
serverVRange' = serverVRange smpCfg'
withNtfServerThreadOn :: HasCallStack => ASrvTransport -> ServiceName -> PostgresStoreCfg -> (HasCallStack => ThreadId -> IO a) -> IO a
withNtfServerThreadOn t port' dbStoreConfig a =
ntfServerCfg >>= \cfg ->
withNtfServerCfg cfg {transports = [(port', t, False)], dbStoreConfig} a
withNtfServerCfg :: HasCallStack => NtfServerConfig -> (ThreadId -> IO a) -> IO a
withNtfServerCfg cfg@NtfServerConfig {transports} =
case transports of
[] -> error "no transports configured"
_ ->
serverBracket
(\started -> runNtfServerBlocking started cfg)
(pure ())
withNtfServerOn :: HasCallStack => ASrvTransport -> ServiceName -> PostgresStoreCfg -> (HasCallStack => IO a) -> IO a
withNtfServerOn t port' dbStoreConfig = withNtfServerThreadOn t port' dbStoreConfig . const
withNtfServer :: HasCallStack => ASrvTransport -> (HasCallStack => IO a) -> IO a
withNtfServer t = withNtfServerOn t ntfTestPort ntfTestDBCfg
runNtfTest :: forall c a. Transport c => (THandleNTF c 'TClient -> IO a) -> IO a
runNtfTest test = withNtfServer (transport @c) $ testNtfClient test
ntfServerTest ::
forall c smp.
(Transport c, Encoding smp) =>
TProxy c 'TServer ->
(Maybe TAuthorizations, ByteString, ByteString, smp) ->
IO (Maybe TAuthorizations, ByteString, ByteString, NtfResponse)
ntfServerTest _ t = runNtfTest $ \h -> tPut' h t >> tGet' h
where
tPut' :: THandleNTF c 'TClient -> (Maybe TAuthorizations, ByteString, ByteString, smp) -> IO ()
tPut' h@THandle {params = THandleParams {sessionId, implySessId}} (sig, corrId, queueId, smp) = do
let t' = if implySessId then smpEncode (corrId, queueId, smp) else smpEncode (sessionId, corrId, queueId, smp)
[Right ()] <- tPut h [Right (sig, t')]
pure ()
tGet' h = do
[(CorrId corrId, EntityId qId, Right cmd)] <- tGetClient h
pure (Nothing, corrId, qId, cmd)
ntfTest :: Transport c => TProxy c 'TServer -> (THandleNTF c 'TClient -> IO ()) -> Expectation
ntfTest _ test' = runNtfTest test' `shouldReturn` ()
data PushMockRequest a = PushMockRequest
{ notification :: a
}
data PushMockResponse = PushRespOk | PushRespError Status Text
data PushMockServer a = PushMockServer
{ action :: Async (),
notifications :: TM.TMap ByteString (TBQueue (PushMockRequest a)),
http2Server :: HTTP2Server
}
data WPNotification = WPNotification
{ authorization :: Maybe ByteString,
encoding :: Maybe ByteString,
ttl :: Maybe ByteString,
urgency :: Maybe ByteString,
body :: ByteString
}
newtype APNSMockServer = APNSMockServer (PushMockServer APNSNotification)
newtype WPMockServer = WPMockServer (PushMockServer WPNotification)
pushMockServerConfig :: ServiceName -> HTTP2ServerConfig
pushMockServerConfig port =
HTTP2ServerConfig
{ qSize = 2,
http2Port = port,
bufferSize = 16384,
bodyHeadSize = 16384,
serverSupported = http2TLSParams,
https2Credentials =
ServerCredentials
{ caCertificateFile = Just "tests/fixtures/ca.crt",
privateKeyFile = "tests/fixtures/server.key",
certificateFile = "tests/fixtures/server.crt"
},
transportConfig = mkTransportServerConfig True Nothing False
}
withAPNSMockServer :: (APNSMockServer -> IO a) -> IO a
withAPNSMockServer = E.bracket (getAPNSMockServer $ pushMockServerConfig apnsTestPort) closeAPNSMockServer
where
closeAPNSMockServer (APNSMockServer a) = closePushMockServer a
withWPMockServer :: (WPMockServer -> IO a) -> IO a
withWPMockServer = E.bracket (getWPMockServer $ pushMockServerConfig wpTestPort) closeWPMockServer
where
closeWPMockServer (WPMockServer a) = closePushMockServer a
deriving instance Generic APNSAlertBody
instance FromJSON APNSAlertBody where
parseJSON (J.Object v) = do
title <- v .: "title"
subtitle <- v .: "subtitle"
body <- v .: "body"
pure APNSAlertObject {title, subtitle, body}
parseJSON (J.String v) = pure $ APNSAlertText v
parseJSON invalid = JT.prependFailure "parsing Coord failed, " (JT.typeMismatch "Object" invalid)
deriving instance Generic APNSNotificationBody
instance FromJSON APNSNotificationBody where parseJSON = J.genericParseJSON apnsJSONOptions {J.rejectUnknownFields = True}
deriving instance Generic APNSNotification
deriving instance FromJSON APNSNotification
deriving instance Generic APNSErrorResponse
deriving instance ToJSON APNSErrorResponse
getAPNSMockServer :: HTTP2ServerConfig -> IO APNSMockServer
getAPNSMockServer config@HTTP2ServerConfig {qSize} = do
http2Server <- getHTTP2Server config
notifications <- TM.emptyIO
action <- async $ runAPNSMockServer notifications http2Server
pure $ APNSMockServer PushMockServer {action, notifications, http2Server}
where
runAPNSMockServer notifications HTTP2Server {reqQ} = forever $ do
HTTP2Request {request, reqBody = HTTP2Body {bodyHead}, sendResponse} <- atomically $ readTBQueue reqQ
let sendApnsResponse = \case
PushRespOk -> sendResponse $ H.responseNoBody N.ok200 []
PushRespError status reason ->
sendResponse . H.responseBuilder status [] . lazyByteString $ J.encode APNSErrorResponse {reason}
case J.decodeStrict' bodyHead of
Just notification -> do
Just token <- pure $ B.stripPrefix "/3/device/" =<< H.requestPath request
q <- atomically $ TM.lookup token notifications >>= maybe (newTokenQueue token) pure
atomically $ writeTBQueue q PushMockRequest {notification}
sendApnsResponse PushRespOk
where
newTokenQueue token = newTBQueue qSize >>= \q -> TM.insert token q notifications >> pure q
_ -> do
putStrLn $ "runAPNSMockServer J.decodeStrict' error, reqBody: " <> show bodyHead
sendApnsResponse $ PushRespError N.badRequest400 "bad_request_body"
getWPMockServer :: HTTP2ServerConfig -> IO WPMockServer
getWPMockServer config@HTTP2ServerConfig {qSize} = do
http2Server <- getHTTP2Server config
notifications <- TM.emptyIO
action <- async $ runWPMockServer notifications http2Server
pure $ WPMockServer PushMockServer {action, notifications, http2Server}
where
runWPMockServer notifications HTTP2Server {reqQ} = forever $ do
HTTP2Request {request, reqBody = HTTP2Body {bodyHead}, sendResponse} <- atomically $ readTBQueue reqQ
let sendWPResponse = \case
PushRespOk -> sendResponse $ H.responseNoBody N.ok200 []
PushRespError status reason ->
sendResponse . H.responseBuilder status [] . lazyByteString $ J.encode $ J.object [ "error" .= reason]
path = fromMaybe "/default" $ H.requestPath request
(_, headers) = H.requestHeaders request
authorization = H.getHeaderValue H.tokenAuthorization headers
encoding = H.getHeaderValue H.tokenContentEncoding headers
ttl = H.getHeaderValue (H.toToken "TTL") headers
urgency = H.getHeaderValue (H.toToken "urgency") headers
notification = WPNotification {body = bodyHead, authorization, encoding, ttl, urgency}
q <- atomically $ TM.lookup path notifications >>= maybe (newTokenQueue path) pure
atomically $ writeTBQueue q PushMockRequest {notification}
sendWPResponse PushRespOk
where
newTokenQueue path = newTBQueue qSize >>= \q -> TM.insert path q notifications >> pure q
getMockNotification :: MonadIO m => PushMockServer a -> DeviceToken -> m (PushMockRequest a)
getMockNotification PushMockServer {notifications} (WPDeviceToken _ (WPTokenParams path _)) = do
atomically $ TM.lookup path notifications >>= maybe retry readTBQueue
getMockNotification PushMockServer {notifications} (APNSDeviceToken _ token) = do
atomically $ TM.lookup token notifications >>= maybe retry readTBQueue
getAnyMockNotification :: MonadIO m => PushMockServer a -> m (PushMockRequest a)
getAnyMockNotification PushMockServer {notifications} = do
atomically $ readTVar notifications >>= mapM readTBQueue . M.elems >>= \case [] -> retry; ntf : _ -> pure ntf
closePushMockServer :: PushMockServer a -> IO ()
closePushMockServer PushMockServer {action, http2Server} = do
closeHTTP2Server http2Server
uninterruptibleCancel action