ntf-server store log (#435)

* ntf-server store log

* ntf serevr: restore log when server is started, save compacted store log

* log ntf server store changes

* test, store log works

* update ntf-server exe
This commit is contained in:
Evgeny Poberezkin
2022-06-28 17:10:50 +01:00
committed by GitHub
parent b0ac0744e2
commit 85d507d5d3
15 changed files with 394 additions and 119 deletions
+4 -1
View File
@@ -1,3 +1,4 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE NamedFieldPuns #-}
module Main where
@@ -56,7 +57,7 @@ ntfServerCLIConfig =
<> defaultServerPort
<> "\n\
\websockets: off\n",
mkServerConfig = \_storeLogFile transports _ ->
mkServerConfig = \storeLogFile transports _ ->
NtfServerConfig
{ transports,
subIdBytes = 24,
@@ -67,6 +68,8 @@ ntfServerCLIConfig =
smpAgentCfg = defaultSMPClientAgentConfig,
apnsConfig = defaultAPNSPushClientConfig,
inactiveClientExpiration = Nothing,
storeLogFile,
resubscribeDelay = 100000, -- 100ms
caCertificateFile = caCrtFile,
privateKeyFile = serverKeyFile,
certificateFile = serverCrtFile
+1
View File
@@ -62,6 +62,7 @@ library
Simplex.Messaging.Notifications.Server.Env
Simplex.Messaging.Notifications.Server.Push.APNS
Simplex.Messaging.Notifications.Server.Store
Simplex.Messaging.Notifications.Server.StoreLog
Simplex.Messaging.Notifications.Transport
Simplex.Messaging.Parsers
Simplex.Messaging.Protocol
+1 -1
View File
@@ -180,7 +180,7 @@ newAgentClient InitialAgentServers {smp, ntf} agentEnv = do
getMsgLocks <- TM.empty
reconnections <- newTVar []
asyncClients <- newTVar []
clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1)
clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i')
lock <- newTMVar ()
return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, reconnections, asyncClients, clientId, agentEnv, lock}
+12
View File
@@ -250,6 +250,12 @@ deriving instance Eq (PrivateKey a)
deriving instance Show (PrivateKey a)
instance StrEncoding (PrivateKey X25519) where
strEncode = strEncode . encodePrivKey
{-# INLINE strEncode #-}
strDecode = decodePrivKey
{-# INLINE strDecode #-}
data APrivateKey
= forall a.
AlgorithmI a =>
@@ -296,6 +302,12 @@ instance Encoding APrivateSignKey where
smpDecode = decodePrivKey
{-# INLINE smpDecode #-}
instance StrEncoding APrivateSignKey where
strEncode = strEncode . encodePrivKey
{-# INLINE strEncode #-}
strDecode = decodePrivKey
{-# INLINE strDecode #-}
data APublicVerifyKey
= forall a.
(AlgorithmI a, SignatureAlgorithm a) =>
@@ -423,6 +423,10 @@ instance Encoding NtfSubStatus where
"SMP_AUTH" -> pure NSSMPAuth
_ -> fail "bad NtfSubStatus"
instance StrEncoding NtfSubStatus where
strEncode = smpEncode
strP = smpP
data NtfTknStatus
= -- | Token created in DB
NTNew
@@ -456,6 +460,10 @@ instance Encoding NtfTknStatus where
"EXPIRED" -> pure NTExpired
_ -> fail "bad NtfTknStatus"
instance StrEncoding NtfTknStatus where
strEncode = smpEncode
strP = smpP
instance FromField NtfTknStatus where fromField = fromTextField_ $ either (const Nothing) Just . smpDecode . encodeUtf8
instance ToField NtfTknStatus where toField = toField . decodeLatin1 . smpEncode
+50 -64
View File
@@ -10,6 +10,7 @@
module Simplex.Messaging.Notifications.Server where
import Control.Concurrent.STM (stateTVar)
import Control.Logger.Simple
import Control.Monad.Except
import Control.Monad.IO.Unlift (MonadUnliftIO)
@@ -17,6 +18,7 @@ import Control.Monad.Reader
import Crypto.Random (MonadRandom)
import Data.ByteString.Char8 (ByteString)
import Data.Functor (($>))
import Data.Map.Strict (Map)
import qualified Data.Text as T
import Data.Time.Clock.System (getSystemTime)
import Network.Socket (ServiceName)
@@ -26,6 +28,7 @@ import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Server.Env
import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..), PushNotification (..), PushProviderClient, PushProviderError (..))
import Simplex.Messaging.Notifications.Server.Store
import Simplex.Messaging.Notifications.Server.StoreLog
import Simplex.Messaging.Notifications.Transport
import Simplex.Messaging.Protocol (ErrorType (..), SMPServer, SignedTransmission, Transmission, encodeTransmission, tGet, tPut)
import qualified Simplex.Messaging.Protocol as SMP
@@ -34,8 +37,8 @@ import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport (..), THandle (..), TProxy, Transport (..))
import Simplex.Messaging.Transport.Server (runTransportServer)
import Simplex.Messaging.Util
import UnliftIO (async, uninterruptibleCancel)
import UnliftIO.Concurrent (forkFinally, threadDelay)
import UnliftIO (IOMode (..), async, uninterruptibleCancel)
import UnliftIO.Concurrent (forkFinally, forkIO, threadDelay)
import UnliftIO.Exception
import UnliftIO.STM
@@ -51,7 +54,10 @@ ntfServer :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfServerConfi
ntfServer NtfServerConfig {transports} started = do
s <- asks subscriber
ps <- asks pushServer
subs <- readTVarIO =<< asks (subscriptions . store)
void . forkIO $ resubscribe s subs
raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports)
`finally` withNtfLog closeStoreLog
where
runServer :: (ServiceName, ATransport) -> m ()
runServer (tcpPort, ATransport t) = do
@@ -65,6 +71,14 @@ ntfServer NtfServerConfig {transports} started = do
Right th -> runNtfClientTransport th
Left _ -> pure ()
resubscribe :: (MonadUnliftIO m, MonadReader NtfEnv m) => NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> m ()
resubscribe NtfSubscriber {newSubQ} subs = do
d <- asks $ resubscribeDelay . config
forM_ subs $ \sub -> do
atomically $ writeTBQueue newSubQ $ NtfSub sub
threadDelay d
liftIO $ logInfo "SMP connections resubscribed"
ntfSubscriber :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfSubscriber -> m ()
ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = do
raceAny_ [subscribe, receiveSMP, receiveAgent]
@@ -133,22 +147,25 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
-- update subscription status
pure ()
ntfPush :: forall m. MonadUnliftIO m => NtfPushServer -> m ()
ntfPush s@NtfPushServer {pushQ} = liftIO . forever . runExceptT $ do
(tkn@NtfTknData {token = DeviceToken pp _, tknStatus}, ntf) <- atomically (readTBQueue pushQ)
logDebug $ "sending push notification to " <> T.pack (show pp)
ntfPush :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfPushServer -> m ()
ntfPush s@NtfPushServer {pushQ} = forever $ do
(tkn@NtfTknData {ntfTknId, token = DeviceToken pp _, tknStatus}, ntf) <- atomically (readTBQueue pushQ)
liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp)
status <- readTVarIO tknStatus
case (status, ntf) of
(_, PNVerification _) -> do
-- TODO check token status
deliverNotification pp tkn ntf
atomically $ modifyTVar tknStatus $ \status' -> if status' == NTActive then NTActive else NTConfirmed
liftIO (runExceptT $ deliverNotification pp tkn ntf) >>= \case
Right _ -> do
status_ <- atomically $ stateTVar tknStatus $ \status' -> if status' == NTActive then (Nothing, NTActive) else (Just NTConfirmed, NTConfirmed)
forM_ status_ $ \status' -> withNtfLog $ \sl -> logTokenStatus sl ntfTknId status'
_ -> pure ()
(NTActive, PNCheckMessages) -> do
deliverNotification pp tkn ntf
liftIO . void . runExceptT $ deliverNotification pp tkn ntf
(NTActive, PNMessage {}) -> do
deliverNotification pp tkn ntf
liftIO . void . runExceptT $ deliverNotification pp tkn ntf
_ -> do
logError "bad notification token status"
liftIO $ logError "bad notification token status"
where
deliverNotification :: PushProvider -> PushProviderClient
deliverNotification pp tkn ntf = do
@@ -277,12 +294,12 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
let dhSecret = C.dh' dhPubKey srvDhPrivKey
tknId <- getId
regCode <- getRegCode
atomically $ do
tkn <- mkNtfTknData tknId newTkn ks dhSecret regCode
addNtfToken st tknId tkn
writeTBQueue pushQ (tkn, PNVerification regCode)
tkn <- atomically $ mkNtfTknData tknId newTkn ks dhSecret regCode
atomically $ addNtfToken st tknId tkn
atomically $ writeTBQueue pushQ (tkn, PNVerification regCode)
withNtfLog (`logCreateToken` tkn)
pure (corrId, "", NRTknId tknId srvDhPubKey)
NtfReqCmd SToken (NtfTkn tkn@NtfTknData {ntfTknId, tknStatus, tknRegCode, tknDhSecret, tknDhKeys = (srvDhPubKey, srvDhPrivKey)}) (corrId, tknId, cmd) -> do
NtfReqCmd SToken (NtfTkn tkn@NtfTknData {ntfTknId, tknStatus, tknRegCode, tknDhSecret, tknDhKeys = (srvDhPubKey, srvDhPrivKey), tknCronInterval}) (corrId, tknId, cmd) -> do
status <- readTVarIO tknStatus
(corrId,tknId,) <$> case cmd of
TNEW (NewNtfTkn _ _ dhPubKey) -> do
@@ -301,6 +318,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
atomically $ writeTVar tknStatus NTActive
tIds <- atomically $ removeInactiveTokenRegistrations st tkn
forM_ tIds cancelInvervalNotifications
withNtfLog $ \s -> logTokenStatus s tknId NTActive
pure NROk
| otherwise -> do
logDebug "TVFY - incorrect code or token status"
@@ -315,6 +333,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
writeTVar tknStatus NTRegistered
addNtfToken st tknId tkn {token = token', tknRegCode = regCode}
writeTBQueue pushQ (tkn, PNVerification regCode)
withNtfLog $ \s -> logUpdateToken s tknId token' regCode
pure NROk
TDEL -> do
logDebug "TDEL"
@@ -323,21 +342,26 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
forM_ qs $ \SMPQueueNtf {smpServer, notifierId} ->
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
cancelInvervalNotifications tknId
withNtfLog (`logDeleteToken` tknId)
pure NROk
TCRN 0 -> do
logDebug "TCRN 0"
atomically $ writeTVar tknCronInterval 0
cancelInvervalNotifications tknId
withNtfLog $ \s -> logTokenCron s tknId 0
pure NROk
TCRN int
| int < 20 -> pure $ NRErr QUOTA
| otherwise -> do
logDebug "TCRN"
atomically $ writeTVar tknCronInterval int
atomically (TM.lookup tknId intervalNotifiers) >>= \case
Nothing -> runIntervalNotifier int
Just IntervalNotifier {interval, action} ->
unless (interval == int) $ do
uninterruptibleCancel action
runIntervalNotifier int
withNtfLog $ \s -> logTokenCron s tknId int
pure NROk
where
runIntervalNotifier interval = do
@@ -352,13 +376,13 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
logDebug "SNEW - new subscription"
st <- asks store
subId <- getId
atomically $ do
sub <- mkNtfSubData newSub
(corrId,"",)
<$> ( addNtfSubscription st subId sub >>= \case
Just _ -> writeTBQueue newSubQ (NtfSub sub) $> NRSubId subId
_ -> pure $ NRErr AUTH
)
sub <- atomically $ mkNtfSubData subId newSub
resp <-
atomically (addNtfSubscription st subId sub) >>= \case
Just _ -> atomically (writeTBQueue newSubQ $ NtfSub sub) $> NRSubId subId
_ -> pure $ NRErr AUTH
withNtfLog (`logCreateSubscription` sub)
pure (corrId, "", resp)
NtfReqCmd SSubscription (NtfSub NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, notifierKey = registeredNKey, subStatus}) (corrId, subId, cmd) -> do
status <- readTVarIO subStatus
(corrId,subId,) <$> case cmd of
@@ -375,6 +399,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
st <- asks store
atomically $ deleteNtfSubscription st subId
atomically $ removeSubscription ca smpServer (SPNotifier, notifierId)
withNtfLog (`logDeleteSubscription` subId)
pure NROk
PING -> pure NRPong
getId :: m NtfEntityId
@@ -390,44 +415,5 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
atomically (TM.lookupDelete tknId intervalNotifiers)
>>= mapM_ (uninterruptibleCancel . action)
-- NReqCreate corrId tokenId smpQueue -> pure (corrId, "", NROk)
-- do
-- st <- asks store
-- (pubDhKey, privDhKey) <- liftIO C.generateKeyPair'
-- let dhSecret = C.dh' dhPubKey privDhKey
-- sub <- atomically $ mkNtfSubsciption smpQueue token verifyKey dhSecret
-- addSubRetry 3 st sub >>= \case
-- Nothing -> pure (corrId, "", NRErr INTERNAL)
-- Just sId -> do
-- atomically $ writeTBQueue subQ sub
-- pure (corrId, sId, NRSubId pubDhKey)
-- where
-- addSubRetry :: Int -> NtfSubscriptionsStore -> NtfSubsciption -> m (Maybe NtfSubsciptionId)
-- addSubRetry 0 _ _ = pure Nothing
-- addSubRetry n st sub = do
-- sId <- getId
-- -- create QueueRec record with these ids and keys
-- atomically (addNtfSub st sId sub) >>= \case
-- Nothing -> addSubRetry (n - 1) st sub
-- _ -> pure $ Just sId
-- getId :: m NtfSubsciptionId
-- getId = do
-- n <- asks $ subIdBytes . config
-- gVar <- asks idsDrg
-- atomically (randomBytes n gVar)
-- NReqCommand sub@NtfSubsciption {tokenId, subStatus} (corrId, subId, cmd) ->
-- (corrId,subId,) <$> case cmd of
-- NCSubCreate tokenId smpQueue -> pure NROk
-- do
-- st <- asks store
-- (pubDhKey, privDhKey) <- liftIO C.generateKeyPair'
-- let dhSecret = C.dh' (dhPubKey newSub) privDhKey
-- atomically (updateNtfSub st sub newSub dhSecret) >>= \case
-- Nothing -> pure $ NRErr INTERNAL
-- _ -> atomically $ do
-- whenM ((== NSEnd) <$> readTVar status) $ writeTBQueue subQ sub
-- pure $ NRSubId pubDhKey
-- NCSubCheck -> NRStat <$> readTVarIO subStatus
-- NCSubDelete -> do
-- st <- asks store
-- atomically (deleteNtfSub st subId) $> NROk
withNtfLog :: (MonadUnliftIO m, MonadReader NtfEnv m) => (StoreLog 'WriteMode -> IO a) -> m ()
withNtfLog action = liftIO . mapM_ action =<< asks storeLog
@@ -22,12 +22,14 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Server.Push.APNS
import Simplex.Messaging.Notifications.Server.Store
import Simplex.Messaging.Notifications.Server.StoreLog
import Simplex.Messaging.Protocol (CorrId, SMPServer, Transmission)
import Simplex.Messaging.Server.Expiration
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (ATransport)
import Simplex.Messaging.Transport.Server (loadFingerprint, loadTLSServerParams)
import System.IO (IOMode (..))
import UnliftIO.STM
data NtfServerConfig = NtfServerConfig
@@ -40,6 +42,8 @@ data NtfServerConfig = NtfServerConfig
smpAgentCfg :: SMPClientAgentConfig,
apnsConfig :: APNSPushClientConfig,
inactiveClientExpiration :: Maybe ExpirationConfig,
storeLogFile :: Maybe FilePath,
resubscribeDelay :: Int, -- microseconds
-- CA certificate private key is not needed for initialization
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
@@ -58,6 +62,7 @@ data NtfEnv = NtfEnv
subscriber :: NtfSubscriber,
pushServer :: NtfPushServer,
store :: NtfStore,
storeLog :: Maybe (StoreLog 'WriteMode),
idsDrg :: TVar ChaChaDRG,
serverIdentity :: C.KeyHash,
tlsServerParams :: T.ServerParams,
@@ -65,14 +70,15 @@ data NtfEnv = NtfEnv
}
newNtfServerEnv :: (MonadUnliftIO m, MonadRandom m) => NtfServerConfig -> m NtfEnv
newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsConfig, caCertificateFile, certificateFile, privateKeyFile} = do
newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsConfig, storeLogFile, caCertificateFile, certificateFile, privateKeyFile} = do
idsDrg <- newTVarIO =<< drgNew
store <- atomically newNtfStore
storeLog <- liftIO $ mapM (`readWriteNtfStore` store) storeLogFile
subscriber <- atomically $ newNtfSubscriber subQSize smpAgentCfg
pushServer <- atomically $ newNtfPushServer pushQSize apnsConfig
tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile
Fingerprint fp <- liftIO $ loadFingerprint caCertificateFile
pure NtfEnv {config, subscriber, pushServer, store, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp}
pure NtfEnv {config, subscriber, pushServer, store, storeLog, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp}
data NtfSubscriber = NtfSubscriber
{ smpSubscribers :: TMap SMPServer SMPSubscriber,
@@ -10,6 +10,7 @@
module Simplex.Messaging.Notifications.Server.Push.APNS where
import Control.Exception (Exception)
import Control.Logger.Simple
import Control.Monad.Except
import Crypto.Hash.Algorithms (SHA256 (..))
@@ -320,7 +321,7 @@ data PushProviderError
| PPTokenInvalid
| PPRetryLater
| PPPermanentError
deriving (Show)
deriving (Show, Exception)
type PushProviderClient = NtfTknData -> PushNotification -> ExceptT PushProviderError IO ()
@@ -17,6 +17,7 @@ import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import Data.Set (Set)
import qualified Data.Set as S
import Data.Word (Word16)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Protocol (NtfPrivateSignKey)
@@ -49,26 +50,19 @@ data NtfTknData = NtfTknData
tknVerifyKey :: C.APublicVerifyKey,
tknDhKeys :: C.KeyPair 'C.X25519,
tknDhSecret :: C.DhSecretX25519,
tknRegCode :: NtfRegCode
tknRegCode :: NtfRegCode,
tknCronInterval :: TVar Word16
}
mkNtfTknData :: NtfTokenId -> NewNtfEntity 'Token -> C.KeyPair 'C.X25519 -> C.DhSecretX25519 -> NtfRegCode -> STM NtfTknData
mkNtfTknData ntfTknId (NewNtfTkn token tknVerifyKey _) tknDhKeys tknDhSecret tknRegCode = do
tknStatus <- newTVar NTRegistered
pure NtfTknData {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode}
-- data NtfSubscriptionsStore = NtfSubscriptionsStore
-- { subscriptions :: TMap NtfSubsciptionId NtfSubsciption,
-- activeSubscriptions :: TMap (SMPServer, NotifierId) NtfSubsciptionId
-- }
-- do
-- subscriptions <- newTVar M.empty
-- activeSubscriptions <- newTVar M.empty
-- pure NtfSubscriptionsStore {subscriptions, activeSubscriptions}
tknCronInterval <- newTVar 0
pure NtfTknData {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval}
data NtfSubData = NtfSubData
{ smpQueue :: SMPQueueNtf,
{ ntfSubId :: NtfSubscriptionId,
smpQueue :: SMPQueueNtf,
notifierKey :: NtfPrivateSignKey,
tokenId :: NtfTokenId,
subStatus :: TVar NtfSubStatus
@@ -175,10 +169,10 @@ getActiveNtfToken st tknId =
tStatus <- readTVar tknStatus
pure $ if tStatus == NTActive then Just tkn else Nothing
mkNtfSubData :: NewNtfEntity 'Subscription -> STM NtfSubData
mkNtfSubData (NewNtfSub tokenId smpQueue notifierKey) = do
mkNtfSubData :: NtfSubscriptionId -> NewNtfEntity 'Subscription -> STM NtfSubData
mkNtfSubData ntfSubId (NewNtfSub tokenId smpQueue notifierKey) = do
subStatus <- newTVar NSNew
pure NtfSubData {smpQueue, tokenId, subStatus, notifierKey}
pure NtfSubData {ntfSubId, smpQueue, tokenId, subStatus, notifierKey}
addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM (Maybe ())
addNtfSubscription st subId sub@NtfSubData {smpQueue, tokenId} =
@@ -0,0 +1,227 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
module Simplex.Messaging.Notifications.Server.StoreLog
( StoreLog,
NtfStoreLogRecord (..),
readWriteNtfStore,
logCreateToken,
logTokenStatus,
logUpdateToken,
logTokenCron,
logDeleteToken,
logCreateSubscription,
logSubscriptionStatus,
logDeleteSubscription,
closeStoreLog,
)
where
import Control.Concurrent.STM
import Control.Monad (void)
import qualified Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Char8 as B
import Data.Word (Word16)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Server.Store
import Simplex.Messaging.Protocol (NtfPrivateSignKey)
import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.Util (whenM)
import System.Directory (doesFileExist, renameFile)
import System.IO
data NtfStoreLogRecord
= CreateToken NtfTknRec
| TokenStatus NtfTokenId NtfTknStatus
| UpdateToken NtfTokenId DeviceToken NtfRegCode
| TokenCron NtfTokenId Word16
| DeleteToken NtfTokenId
| CreateSubscription NtfSubRec
| SubscriptionStatus NtfSubscriptionId NtfSubStatus
| DeleteSubscription NtfSubscriptionId
data NtfTknRec = NtfTknRec
{ ntfTknId :: NtfTokenId,
token :: DeviceToken,
tknStatus :: NtfTknStatus,
tknVerifyKey :: C.APublicVerifyKey,
tknDhKeys :: C.KeyPair 'C.X25519,
tknDhSecret :: C.DhSecretX25519,
tknRegCode :: NtfRegCode,
tknCronInterval :: Word16
}
mkTknData :: NtfTknRec -> STM NtfTknData
mkTknData NtfTknRec {ntfTknId, token, tknStatus = status, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval = cronInt} = do
tknStatus <- newTVar status
tknCronInterval <- newTVar cronInt
pure NtfTknData {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval}
mkTknRec :: NtfTknData -> STM NtfTknRec
mkTknRec NtfTknData {ntfTknId, token, tknStatus = status, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval = cronInt} = do
tknStatus <- readTVar status
tknCronInterval <- readTVar cronInt
pure NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval}
data NtfSubRec = NtfSubRec
{ ntfSubId :: NtfSubscriptionId,
smpQueue :: SMPQueueNtf,
notifierKey :: NtfPrivateSignKey,
tokenId :: NtfTokenId,
subStatus :: NtfSubStatus
}
mkSubData :: NtfSubRec -> STM NtfSubData
mkSubData NtfSubRec {ntfSubId, smpQueue, notifierKey, tokenId, subStatus = status} = do
subStatus <- newTVar status
pure NtfSubData {ntfSubId, smpQueue, notifierKey, tokenId, subStatus}
mkSubRec :: NtfSubData -> STM NtfSubRec
mkSubRec NtfSubData {ntfSubId, smpQueue, notifierKey, tokenId, subStatus = status} = do
subStatus <- readTVar status
pure NtfSubRec {ntfSubId, smpQueue, notifierKey, tokenId, subStatus}
instance StrEncoding NtfStoreLogRecord where
strEncode = \case
CreateToken tknRec -> strEncode (Str "TCREATE", tknRec)
TokenStatus tknId tknStatus -> strEncode (Str "TSTATUS", tknId, tknStatus)
UpdateToken tknId token regCode -> strEncode (Str "TUPDATE", tknId, token, regCode)
TokenCron tknId cronInt -> strEncode (Str "TCRON", tknId, cronInt)
DeleteToken tknId -> strEncode (Str "TDELETE", tknId)
CreateSubscription subRec -> strEncode (Str "SCREATE", subRec)
SubscriptionStatus subId subStatus -> strEncode (Str "SSTATUS", subId, subStatus)
DeleteSubscription subId -> strEncode (Str "SDELETE", subId)
strP =
A.choice
[ "TCREATE " *> (CreateToken <$> strP),
"TSTATUS " *> (TokenStatus <$> strP_ <*> strP),
"TUPDATE " *> (UpdateToken <$> strP_ <*> strP_ <*> strP),
"TCRON " *> (TokenCron <$> strP_ <*> strP),
"TDELETE " *> (DeleteToken <$> strP),
"SCREATE " *> (CreateSubscription <$> strP),
"SSTATUS " *> (SubscriptionStatus <$> strP_ <*> strP),
"SDELETE " *> (DeleteSubscription <$> strP)
]
instance StrEncoding NtfTknRec where
strEncode NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval} =
B.unwords
[ "tknId=" <> strEncode ntfTknId,
"token=" <> strEncode token,
"tokenStatus=" <> strEncode tknStatus,
"verifyKey=" <> strEncode tknVerifyKey,
"dhKeys=" <> strEncode tknDhKeys,
"dhSecret=" <> strEncode tknDhSecret,
"regCode=" <> strEncode tknRegCode,
"cron=" <> strEncode tknCronInterval
]
strP = do
ntfTknId <- "tknId=" *> strP_
token <- "token=" *> strP_
tknStatus <- "tokenStatus=" *> strP_
tknVerifyKey <- "verifyKey=" *> strP_
tknDhKeys <- "dhKeys=" *> strP_
tknDhSecret <- "dhSecret=" *> strP_
tknRegCode <- "regCode=" *> strP_
tknCronInterval <- "cron=" *> strP
pure NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval}
instance StrEncoding NtfSubRec where
strEncode NtfSubRec {ntfSubId, smpQueue, notifierKey, tokenId, subStatus} =
B.unwords
[ "subId=" <> strEncode ntfSubId,
"smpQueue=" <> strEncode smpQueue,
"notifierKey=" <> strEncode notifierKey,
"tknId=" <> strEncode tokenId,
"subStatus=" <> strEncode subStatus
]
strP = do
ntfSubId <- "subId=" *> strP_
smpQueue <- "smpQueue=" *> strP_
notifierKey <- "notifierKey=" *> strP_
tokenId <- "tknId=" *> strP_
subStatus <- "subStatus=" *> strP
pure NtfSubRec {ntfSubId, smpQueue, notifierKey, tokenId, subStatus}
logNtfStoreRecord :: StoreLog 'WriteMode -> NtfStoreLogRecord -> IO ()
logNtfStoreRecord = writeStoreLogRecord
logCreateToken :: StoreLog 'WriteMode -> NtfTknData -> IO ()
logCreateToken s tkn = logNtfStoreRecord s . CreateToken =<< atomically (mkTknRec tkn)
logTokenStatus :: StoreLog 'WriteMode -> NtfTokenId -> NtfTknStatus -> IO ()
logTokenStatus s tknId tknStatus = logNtfStoreRecord s $ TokenStatus tknId tknStatus
logUpdateToken :: StoreLog 'WriteMode -> NtfTokenId -> DeviceToken -> NtfRegCode -> IO ()
logUpdateToken s tknId token regCode = logNtfStoreRecord s $ UpdateToken tknId token regCode
logTokenCron :: StoreLog 'WriteMode -> NtfTokenId -> Word16 -> IO ()
logTokenCron s tknId cronInt = logNtfStoreRecord s $ TokenCron tknId cronInt
logDeleteToken :: StoreLog 'WriteMode -> NtfTokenId -> IO ()
logDeleteToken s tknId = logNtfStoreRecord s $ DeleteToken tknId
logCreateSubscription :: StoreLog 'WriteMode -> NtfSubData -> IO ()
logCreateSubscription s sub = logNtfStoreRecord s . CreateSubscription =<< atomically (mkSubRec sub)
logSubscriptionStatus :: StoreLog 'WriteMode -> NtfSubscriptionId -> NtfSubStatus -> IO ()
logSubscriptionStatus s subId subStatus = logNtfStoreRecord s $ SubscriptionStatus subId subStatus
logDeleteSubscription :: StoreLog 'WriteMode -> NtfSubscriptionId -> IO ()
logDeleteSubscription s subId = logNtfStoreRecord s $ DeleteSubscription subId
readWriteNtfStore :: FilePath -> NtfStore -> IO (StoreLog 'WriteMode)
readWriteNtfStore f st = do
whenM (doesFileExist f) $ do
readNtfStore f st
renameFile f $ f <> ".bak"
s <- openWriteStoreLog f
writeNtfStore s st
pure s
readNtfStore :: FilePath -> NtfStore -> IO ()
readNtfStore f st = mapM_ addNtfLogRecord . B.lines =<< B.readFile f
where
addNtfLogRecord s = case strDecode s of
Left e -> B.putStrLn $ "Log parsing error (" <> B.pack e <> "): " <> B.take 100 s
Right lr -> atomically $ case lr of
CreateToken r@NtfTknRec {ntfTknId} -> do
tkn <- mkTknData r
addNtfToken st ntfTknId tkn
TokenStatus tknId status ->
getNtfToken st tknId
>>= mapM_ (\NtfTknData {tknStatus} -> writeTVar tknStatus status)
UpdateToken tknId token' tknRegCode ->
getNtfToken st tknId
>>= mapM_
( \tkn@NtfTknData {tknStatus} -> do
removeTokenRegistration st tkn
writeTVar tknStatus NTRegistered
addNtfToken st tknId tkn {token = token', tknRegCode}
)
TokenCron tknId cronInt ->
getNtfToken st tknId
>>= mapM_ (\NtfTknData {tknCronInterval} -> writeTVar tknCronInterval cronInt)
DeleteToken tknId ->
void $ deleteNtfToken st tknId
CreateSubscription r@NtfSubRec {ntfSubId} -> do
sub <- mkSubData r
void $ addNtfSubscription st ntfSubId sub
SubscriptionStatus subId status ->
getNtfSubscription st subId
>>= mapM_ (\NtfSubData {subStatus} -> writeTVar subStatus status)
DeleteSubscription subId ->
deleteNtfSubscription st subId
writeNtfStore :: StoreLog 'WriteMode -> NtfStore -> IO ()
writeNtfStore s NtfStore {tokens, subscriptions} = do
atomically (readTVar tokens >>= mapM mkTknRec)
>>= mapM_ (writeStoreLogRecord s . CreateToken)
atomically (readTVar subscriptions >>= mapM mkSubRec)
>>= mapM_ (writeStoreLogRecord s . CreateSubscription)
+6 -2
View File
@@ -13,6 +13,7 @@ module Simplex.Messaging.Server.StoreLog
openReadStoreLog,
storeLogFilePath,
closeStoreLog,
writeStoreLogRecord,
logCreateQueue,
logSecureQueue,
logAddNotifier,
@@ -91,7 +92,10 @@ instance StrEncoding StoreLogRecord where
<|> "NDELETE" *> (DeleteNotifier <$> strP)
openWriteStoreLog :: FilePath -> IO (StoreLog 'WriteMode)
openWriteStoreLog f = WriteStoreLog f <$> openFile f WriteMode
openWriteStoreLog f = do
h <- openFile f WriteMode
hSetBuffering h LineBuffering
pure $ WriteStoreLog f h
openReadStoreLog :: FilePath -> IO (StoreLog 'ReadMode)
openReadStoreLog f = do
@@ -108,7 +112,7 @@ closeStoreLog = \case
WriteStoreLog _ h -> hClose h
ReadStoreLog _ h -> hClose h
writeStoreLogRecord :: StoreLog 'WriteMode -> StoreLogRecord -> IO ()
writeStoreLogRecord :: StrEncoding r => StoreLog 'WriteMode -> r -> IO ()
writeStoreLogRecord (WriteStoreLog _ h) r = do
B.hPutStrLn h $ strEncode r
hFlush h
+1 -1
View File
@@ -54,7 +54,7 @@ runTCPServer started port server = do
(closeServer started clients)
$ \sock -> forever . E.bracketOnError (accept sock) (close . fst) $ \(conn, _peer) -> do
-- catchAll_ is needed here in case the connection was closed earlier
cId <- atomically $ stateTVar clientId $ \cId -> (cId + 1, cId + 1)
cId <- atomically $ stateTVar clientId $ \cId -> let cId' = cId + 1 in (cId', cId')
let closeConn _ = atomically (TM.delete cId clients) >> gracefulClose conn 5000 `catchAll_` pure ()
tId <- mkWeakThreadId =<< server conn `forkFinally` closeConn
atomically $ TM.insert cId tId clients
+1
View File
@@ -8,6 +8,7 @@
module AgentTests.FunctionalAPITests
( functionalAPITests,
makeConnection,
get,
(##>),
(=##>),
+50 -29
View File
@@ -9,8 +9,8 @@ module AgentTests.NotificationTests where
-- import Control.Logger.Simple (LogConfig (..), LogLevel (..), setLogLevel, withGlobalLogging)
import AgentTests.FunctionalAPITests (get, (##>), (=##>), pattern Msg)
import Control.Concurrent (threadDelay)
import AgentTests.FunctionalAPITests (get, makeConnection, (##>), (=##>), pattern Msg)
import Control.Concurrent (killThread, threadDelay)
import Control.Monad.Except
import qualified Data.Aeson as J
import qualified Data.Aeson.Types as JT
@@ -69,6 +69,11 @@ notificationTests t =
withSmpServer t $
withAPNSMockServer $ \apns ->
withNtfServer t $ testChangeNotificationsMode apns
describe "Notifications server store log" $ do
it "should save and restore tokens and subscriptions" $ \_ ->
withSmpServer t $
withAPNSMockServer $ \apns ->
testNotificationsStoreLog t apns
testNotificationToken :: APNSMockServer -> IO ()
testNotificationToken APNSMockServer {apnsQ} = do
@@ -253,25 +258,9 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
Right () <- runExceptT $ do
-- alice registers notification token
let aliceTkn = DeviceToken PPApnsTest "abcd"
NTRegistered <- registerNtfToken alice aliceTkn NMInstant
APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData}, sendApnsResponse} <-
atomically $ readTBQueue apnsQ
verification <- ntfData .-> "verification"
nonce <- C.cbNonce <$> ntfData .-> "nonce"
liftIO $ sendApnsResponse APNSRespOk
verifyNtfToken alice aliceTkn nonce verification
NTActive <- checkNtfToken alice aliceTkn
_ <- registerTestToken alice "abcd" NMInstant apnsQ
-- bob registers notification token
let bobTkn = DeviceToken PPApnsTest "bcde"
NTRegistered <- registerNtfToken bob bobTkn NMInstant
APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData'}, sendApnsResponse = sendApnsResponse'} <-
atomically $ readTBQueue apnsQ
verification' <- ntfData' .-> "verification"
nonce' <- C.cbNonce <$> ntfData' .-> "nonce"
liftIO $ sendApnsResponse' APNSRespOk
verifyNtfToken bob bobTkn nonce' verification'
NTActive <- checkNtfToken bob bobTkn
_ <- registerTestToken bob "bcde" NMInstant apnsQ
-- establish connection
liftIO $ threadDelay 50000
(bobId, qInfo) <- createConnection alice SCMInvitation
@@ -308,6 +297,19 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do
baseId = 3
msgId = subtract baseId
registerTestToken :: AgentClient -> ByteString -> NotificationsMode -> TBQueue APNSMockRequest -> ExceptT AgentErrorType IO DeviceToken
registerTestToken a token mode apnsQ = do
let tkn = DeviceToken PPApnsTest token
NTRegistered <- registerNtfToken a tkn mode
APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData'}, sendApnsResponse = sendApnsResponse'} <-
atomically $ readTBQueue apnsQ
verification' <- ntfData' .-> "verification"
nonce' <- C.cbNonce <$> ntfData' .-> "nonce"
liftIO $ sendApnsResponse' APNSRespOk
verifyNtfToken a tkn nonce' verification'
NTActive <- checkNtfToken a tkn
pure tkn
testChangeNotificationsMode :: APNSMockServer -> IO ()
testChangeNotificationsMode APNSMockServer {apnsQ} = do
alice <- getSMPAgentClient agentCfg initAgentServers
@@ -322,15 +324,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do
get alice ##> ("", bobId, CON)
get bob ##> ("", aliceId, CON)
-- register notification token, set mode to NMPeriodic
let tkn = DeviceToken PPApnsTest "abcd"
NTRegistered <- registerNtfToken alice tkn NMPeriodic
APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData}, sendApnsResponse} <-
atomically $ readTBQueue apnsQ
verification <- ntfData .-> "verification"
vNonce <- C.cbNonce <$> ntfData .-> "nonce"
liftIO $ sendApnsResponse APNSRespOk
verifyNtfToken alice tkn vNonce verification
NTActive <- checkNtfToken alice tkn
tkn <- registerTestToken alice "abcd" NMPeriodic apnsQ
-- send message, no notification
1 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello"
get bob ##> ("", aliceId, SENT $ baseId + 1)
@@ -362,6 +356,33 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do
baseId = 3
msgId = subtract baseId
testNotificationsStoreLog :: ATransport -> APNSMockServer -> IO ()
testNotificationsStoreLog t APNSMockServer {apnsQ} = do
alice <- getSMPAgentClient agentCfg initAgentServers
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
Right (aliceId, bobId) <- withNtfServerStoreLog t $ \threadId -> runExceptT $ do
_ <- registerTestToken alice "abcd" NMInstant apnsQ
(aliceId, bobId) <- makeConnection alice bob
liftIO $ threadDelay 250000
4 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello"
get bob ##> ("", aliceId, SENT 4)
void $ messageNotification apnsQ
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
ackMessage alice bobId 4
liftIO $ killThread threadId
pure (aliceId, bobId)
liftIO $ threadDelay 250000
Right () <- withNtfServerStoreLog t $ \threadId -> runExceptT $ do
liftIO $ threadDelay 250000
5 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello again"
get bob ##> ("", aliceId, SENT 5)
void $ messageNotification apnsQ
get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False
liftIO $ killThread threadId
pure ()
messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString)
messageNotification apnsQ = do
500000 `timeout` atomically (readTBQueue apnsQ) >>= \case
+13 -2
View File
@@ -62,6 +62,9 @@ apnsTestPort = "6010"
testKeyHash :: C.KeyHash
testKeyHash = "LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI="
ntfTestStoreLogFile :: FilePath
ntfTestStoreLogFile = "tests/tmp/ntf-server-store.log"
testNtfClient :: (Transport c, MonadUnliftIO m) => (THandle c -> m a) -> m a
testNtfClient client =
runTransportClient testHost ntfTestPort (Just testKeyHash) (Just defaultKeepAliveOpts) $ \h ->
@@ -85,16 +88,24 @@ ntfServerCfg =
http2cfg = defaultHTTP2ClientConfig {caStoreFile = "tests/fixtures/ca.crt"}
},
inactiveClientExpiration = Just defaultInactiveClientExpiration,
storeLogFile = Nothing,
resubscribeDelay = 1000,
-- CA certificate private key is not needed for initialization
caCertificateFile = "tests/fixtures/ca.crt",
privateKeyFile = "tests/fixtures/server.key",
certificateFile = "tests/fixtures/server.crt"
}
withNtfServerStoreLog :: (MonadUnliftIO m, MonadRandom m) => ATransport -> (ThreadId -> m a) -> m a
withNtfServerStoreLog t = withNtfServerCfg t ntfServerCfg {storeLogFile = Just ntfTestStoreLogFile}
withNtfServerThreadOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a
withNtfServerThreadOn t port' =
withNtfServerThreadOn t port' = withNtfServerCfg t ntfServerCfg {transports = [(port', t)]}
withNtfServerCfg :: (MonadUnliftIO m, MonadRandom m) => ATransport -> NtfServerConfig -> (ThreadId -> m a) -> m a
withNtfServerCfg t cfg =
serverBracket
(\started -> runNtfServerBlocking started ntfServerCfg {transports = [(port', t)]})
(\started -> runNtfServerBlocking started cfg {transports = [(ntfTestPort, t)]})
(pure ())
serverBracket :: MonadUnliftIO m => (TMVar Bool -> m ()) -> m () -> (ThreadId -> m a) -> m a