{-# LANGUAGE DataKinds #-} {-# LANGUAGE DeriveAnyClass #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeApplications #-} {-# OPTIONS_GHC -fno-warn-orphans #-} module NtfClient where import Control.Concurrent.STM (retry) import Control.Exception (throwIO) import Control.Monad import Control.Monad.Except (runExceptT) import Control.Monad.IO.Class import Data.Aeson (FromJSON (..), ToJSON (..), (.:)) import qualified Data.Aeson as J import qualified Data.Aeson.Types as JT import Data.ByteString.Builder (lazyByteString) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.List.NonEmpty (NonEmpty) import qualified Data.Map.Strict as M import Data.Text (Text) import Database.PostgreSQL.Simple (ConnectInfo (..), defaultConnectInfo) import GHC.Generics (Generic) import Network.HTTP.Types (Status) import qualified Network.HTTP.Types as N import qualified Network.HTTP2.Server as H import Network.Socket import SMPClient (defaultStartOptions, ntfTestPort, ntfTestServerCredentials, prevRange, serverBracket) import Simplex.Messaging.Agent.Store.Postgres.Options (DBOpts (..)) import Simplex.Messaging.Agent.Store.Shared (MigrationConfirmation (..)) import Simplex.Messaging.Client (ProtocolClientConfig (..), chooseTransportHost, defaultNetworkConfig) import Simplex.Messaging.Client.Agent (SMPClientAgentConfig (..), defaultSMPClientAgentConfig) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Notifications.Protocol (DeviceToken(..), NtfResponse, WPTokenParams(..)) import Simplex.Messaging.Notifications.Server (runNtfServerBlocking) import Simplex.Messaging.Notifications.Server.Env import Simplex.Messaging.Notifications.Server.Main (getVapidKey) import Simplex.Messaging.Notifications.Server.Push.APNS import Simplex.Messaging.Notifications.Server.Push.APNS.Internal import Simplex.Messaging.Notifications.Server.Push.WebPush (WebPushConfig (..)) import Simplex.Messaging.Notifications.Transport import Simplex.Messaging.Protocol import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..)) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Client import Simplex.Messaging.Transport.HTTP2 (HTTP2Body (..), http2TLSParams) import Simplex.Messaging.Transport.HTTP2.Server import Simplex.Messaging.Transport.Server import Test.Hspec hiding (fit, it) import UnliftIO.Async import UnliftIO.Concurrent import qualified UnliftIO.Exception as E import UnliftIO.STM import Data.Aeson.Types ((.=)) import qualified Network.HPACK as H import qualified Network.HPACK.Token as H import Data.Maybe (fromMaybe) testHost :: NonEmpty TransportHost testHost = "localhost" apnsTestPort :: ServiceName apnsTestPort = "6010" wpTestPort :: ServiceName wpTestPort = "6011" testKeyHash :: C.KeyHash testKeyHash = "LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=" ntfTestStoreLogFile :: FilePath ntfTestStoreLogFile = "tests/tmp/ntf-server-store.log" ntfTestStoreLogFile2 :: FilePath ntfTestStoreLogFile2 = "tests/tmp/ntf-server-store.log.2" ntfTestStoreLastNtfsFile :: FilePath ntfTestStoreLastNtfsFile = "tests/tmp/ntf-server-last-notifications.log" ntfTestPrometheusMetricsFile :: FilePath ntfTestPrometheusMetricsFile = "tests/tmp/ntf-server-metrics.txt" ntfTestStoreDBOpts :: DBOpts ntfTestStoreDBOpts = DBOpts { connstr = ntfTestServerDBConnstr, schema = "ntf_server", poolSize = 3, createSchema = True } ntfTestStoreDBOpts2 :: DBOpts ntfTestStoreDBOpts2 = ntfTestStoreDBOpts {schema = "smp_server2"} ntfTestServerDBConnstr :: ByteString ntfTestServerDBConnstr = "postgresql://ntf_test_server_user@/ntf_test_server_db" ntfTestServerDBConnectInfo :: ConnectInfo ntfTestServerDBConnectInfo = defaultConnectInfo { connectUser = "ntf_test_server_user", connectDatabase = "ntf_test_server_db" } ntfTestDBCfg :: PostgresStoreCfg ntfTestDBCfg = PostgresStoreCfg { dbOpts = ntfTestStoreDBOpts, dbStoreLogPath = Just ntfTestStoreLogFile, confirmMigrations = MCYesUp, deletedTTL = 86400 } ntfTestDBCfg2 :: PostgresStoreCfg ntfTestDBCfg2 = ntfTestDBCfg {dbOpts = ntfTestStoreDBOpts2, dbStoreLogPath = Just ntfTestStoreLogFile2} testNtfClient :: Transport c => (THandleNTF c 'TClient -> IO a) -> IO a testNtfClient client = do Right host <- pure $ chooseTransportHost defaultNetworkConfig testHost runTransportClient defaultTransportClientConfig Nothing host ntfTestPort (Just testKeyHash) $ \h -> runExceptT (ntfClientHandshake h testKeyHash supportedClientNTFVRange False Nothing) >>= \case Right th -> client th Left e -> error $ show e ntfServerCfg :: IO NtfServerConfig ntfServerCfg = do vapidKey <- getVapidKey "tests/fixtures/vapid.privkey" pure NtfServerConfig { transports = [], controlPort = Nothing, controlPortUserAuth = Nothing, controlPortAdminAuth = Nothing, subIdBytes = 24, regCodeBytes = 32, clientQSize = 2, pushQSize = 2, smpAgentCfg = defaultSMPClientAgentConfig {persistErrorInterval = 0}, apnsConfig = defaultAPNSPushClientConfig { apnsPort = apnsTestPort, caStoreFile = "tests/fixtures/ca.crt" }, wpConfig = WebPushConfig {vapidKey, paddedNtfLength = 3072}, subsBatchSize = 900, inactiveClientExpiration = Just defaultInactiveClientExpiration, dbStoreConfig = ntfTestDBCfg, ntfCredentials = ntfTestServerCredentials, useServiceCreds = True, periodicNtfsInterval = 1, -- stats config logStatsInterval = Nothing, logStatsStartTime = 0, serverStatsLogFile = "tests/ntf-server-stats.daily.log", serverStatsBackupFile = Nothing, prometheusInterval = Nothing, prometheusMetricsFile = ntfTestPrometheusMetricsFile, ntfServerVRange = supportedServerNTFVRange, transportConfig = mkTransportServerConfig True (Just alpnSupportedNTFHandshakes) False, startOptions = defaultStartOptions } ntfServerCfgVPrev :: IO NtfServerConfig ntfServerCfgVPrev = ntfServerCfg >>= \cfg -> pure $ ntfServerCfgVPrev' cfg ntfServerCfgVPrev' :: NtfServerConfig -> NtfServerConfig ntfServerCfgVPrev' cfg = cfg { ntfServerVRange = prevRange $ ntfServerVRange cfg, smpAgentCfg = smpAgentCfg' {smpCfg = smpCfg' {serverVRange = prevRange serverVRange'}} } where smpAgentCfg' = smpAgentCfg cfg smpCfg' = smpCfg smpAgentCfg' serverVRange' = serverVRange smpCfg' withNtfServerThreadOn :: HasCallStack => ASrvTransport -> ServiceName -> PostgresStoreCfg -> (HasCallStack => ThreadId -> IO a) -> IO a withNtfServerThreadOn t port' dbStoreConfig a = ntfServerCfg >>= \cfg -> withNtfServerCfg cfg {transports = [(port', t, False)], dbStoreConfig} a withNtfServerCfg :: HasCallStack => NtfServerConfig -> (ThreadId -> IO a) -> IO a withNtfServerCfg cfg@NtfServerConfig {transports} = case transports of [] -> error "no transports configured" _ -> serverBracket (\started -> runNtfServerBlocking started cfg) (pure ()) withNtfServerOn :: HasCallStack => ASrvTransport -> ServiceName -> PostgresStoreCfg -> (HasCallStack => IO a) -> IO a withNtfServerOn t port' dbStoreConfig = withNtfServerThreadOn t port' dbStoreConfig . const withNtfServer :: HasCallStack => ASrvTransport -> (HasCallStack => IO a) -> IO a withNtfServer t = withNtfServerOn t ntfTestPort ntfTestDBCfg runNtfTest :: forall c a. Transport c => (THandleNTF c 'TClient -> IO a) -> IO a runNtfTest test = withNtfServer (transport @c) $ testNtfClient test ntfServerTest :: forall c smp. (Transport c, Encoding smp) => TProxy c 'TServer -> (Maybe TAuthorizations, ByteString, ByteString, smp) -> IO (Maybe TAuthorizations, ByteString, ByteString, NtfResponse) ntfServerTest _ t = runNtfTest $ \h -> tPut' h t >> tGet' h where tPut' :: THandleNTF c 'TClient -> (Maybe TAuthorizations, ByteString, ByteString, smp) -> IO () tPut' h@THandle {params = THandleParams {sessionId, implySessId}} (sig, corrId, queueId, smp) = do let t' = if implySessId then smpEncode (corrId, queueId, smp) else smpEncode (sessionId, corrId, queueId, smp) [Right ()] <- tPut h [Right (sig, t')] pure () tGet' h = do [(CorrId corrId, EntityId qId, Right cmd)] <- tGetClient h pure (Nothing, corrId, qId, cmd) ntfTest :: Transport c => TProxy c 'TServer -> (THandleNTF c 'TClient -> IO ()) -> Expectation ntfTest _ test' = runNtfTest test' `shouldReturn` () data PushMockRequest a = PushMockRequest { notification :: a } data PushMockResponse = PushRespOk | PushRespError Status Text data PushMockServer a = PushMockServer { action :: Async (), notifications :: TM.TMap ByteString (TBQueue (PushMockRequest a)), http2Server :: HTTP2Server } data WPNotification = WPNotification { authorization :: Maybe ByteString, encoding :: Maybe ByteString, ttl :: Maybe ByteString, urgency :: Maybe ByteString, body :: ByteString } newtype APNSMockServer = APNSMockServer (PushMockServer APNSNotification) newtype WPMockServer = WPMockServer (PushMockServer WPNotification) pushMockServerConfig :: ServiceName -> HTTP2ServerConfig pushMockServerConfig port = HTTP2ServerConfig { qSize = 2, http2Port = port, bufferSize = 16384, bodyHeadSize = 16384, serverSupported = http2TLSParams, https2Credentials = ServerCredentials { caCertificateFile = Just "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", certificateFile = "tests/fixtures/server.crt" }, transportConfig = mkTransportServerConfig True Nothing False } withAPNSMockServer :: (APNSMockServer -> IO a) -> IO a withAPNSMockServer = E.bracket (getAPNSMockServer $ pushMockServerConfig apnsTestPort) closeAPNSMockServer where closeAPNSMockServer (APNSMockServer a) = closePushMockServer a withWPMockServer :: (WPMockServer -> IO a) -> IO a withWPMockServer = E.bracket (getWPMockServer $ pushMockServerConfig wpTestPort) closeWPMockServer where closeWPMockServer (WPMockServer a) = closePushMockServer a deriving instance Generic APNSAlertBody instance FromJSON APNSAlertBody where parseJSON (J.Object v) = do title <- v .: "title" subtitle <- v .: "subtitle" body <- v .: "body" pure APNSAlertObject {title, subtitle, body} parseJSON (J.String v) = pure $ APNSAlertText v parseJSON invalid = JT.prependFailure "parsing Coord failed, " (JT.typeMismatch "Object" invalid) deriving instance Generic APNSNotificationBody instance FromJSON APNSNotificationBody where parseJSON = J.genericParseJSON apnsJSONOptions {J.rejectUnknownFields = True} deriving instance Generic APNSNotification deriving instance FromJSON APNSNotification deriving instance Generic APNSErrorResponse deriving instance ToJSON APNSErrorResponse getAPNSMockServer :: HTTP2ServerConfig -> IO APNSMockServer getAPNSMockServer config@HTTP2ServerConfig {qSize} = do http2Server <- getHTTP2Server config notifications <- TM.emptyIO action <- async $ runAPNSMockServer notifications http2Server pure $ APNSMockServer PushMockServer {action, notifications, http2Server} where runAPNSMockServer notifications HTTP2Server {reqQ} = forever $ do HTTP2Request {request, reqBody = HTTP2Body {bodyHead}, sendResponse} <- atomically $ readTBQueue reqQ let sendApnsResponse = \case PushRespOk -> sendResponse $ H.responseNoBody N.ok200 [] PushRespError status reason -> sendResponse . H.responseBuilder status [] . lazyByteString $ J.encode APNSErrorResponse {reason} case J.decodeStrict' bodyHead of Just notification -> do Just token <- pure $ B.stripPrefix "/3/device/" =<< H.requestPath request q <- atomically $ TM.lookup token notifications >>= maybe (newTokenQueue token) pure atomically $ writeTBQueue q PushMockRequest {notification} sendApnsResponse PushRespOk where newTokenQueue token = newTBQueue qSize >>= \q -> TM.insert token q notifications >> pure q _ -> do putStrLn $ "runAPNSMockServer J.decodeStrict' error, reqBody: " <> show bodyHead sendApnsResponse $ PushRespError N.badRequest400 "bad_request_body" getWPMockServer :: HTTP2ServerConfig -> IO WPMockServer getWPMockServer config@HTTP2ServerConfig {qSize} = do http2Server <- getHTTP2Server config notifications <- TM.emptyIO action <- async $ runWPMockServer notifications http2Server pure $ WPMockServer PushMockServer {action, notifications, http2Server} where runWPMockServer notifications HTTP2Server {reqQ} = forever $ do HTTP2Request {request, reqBody = HTTP2Body {bodyHead}, sendResponse} <- atomically $ readTBQueue reqQ let sendWPResponse = \case PushRespOk -> sendResponse $ H.responseNoBody N.ok200 [] PushRespError status reason -> sendResponse . H.responseBuilder status [] . lazyByteString $ J.encode $ J.object [ "error" .= reason] path = fromMaybe "/default" $ H.requestPath request (_, headers) = H.requestHeaders request authorization = H.getHeaderValue H.tokenAuthorization headers encoding = H.getHeaderValue H.tokenContentEncoding headers ttl = H.getHeaderValue (H.toToken "TTL") headers urgency = H.getHeaderValue (H.toToken "urgency") headers notification = WPNotification {body = bodyHead, authorization, encoding, ttl, urgency} q <- atomically $ TM.lookup path notifications >>= maybe (newTokenQueue path) pure atomically $ writeTBQueue q PushMockRequest {notification} sendWPResponse PushRespOk where newTokenQueue path = newTBQueue qSize >>= \q -> TM.insert path q notifications >> pure q getMockNotification :: MonadIO m => PushMockServer a -> DeviceToken -> m (PushMockRequest a) getMockNotification PushMockServer {notifications} (WPDeviceToken _ (WPTokenParams path _)) = do atomically $ TM.lookup path notifications >>= maybe retry readTBQueue getMockNotification PushMockServer {notifications} (APNSDeviceToken _ token) = do atomically $ TM.lookup token notifications >>= maybe retry readTBQueue getAnyMockNotification :: MonadIO m => PushMockServer a -> m (PushMockRequest a) getAnyMockNotification PushMockServer {notifications} = do atomically $ readTVar notifications >>= mapM readTBQueue . M.elems >>= \case [] -> retry; ntf : _ -> pure ntf closePushMockServer :: PushMockServer a -> IO () closePushMockServer PushMockServer {action, http2Server} = do closeHTTP2Server http2Server uninterruptibleCancel action