diff --git a/apps/ntf-server/Main.hs b/apps/ntf-server/Main.hs index 6d40a8bdc..d17929a65 100644 --- a/apps/ntf-server/Main.hs +++ b/apps/ntf-server/Main.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE NamedFieldPuns #-} module Main where @@ -56,7 +57,7 @@ ntfServerCLIConfig = <> defaultServerPort <> "\n\ \websockets: off\n", - mkServerConfig = \_storeLogFile transports _ -> + mkServerConfig = \storeLogFile transports _ -> NtfServerConfig { transports, subIdBytes = 24, @@ -67,6 +68,8 @@ ntfServerCLIConfig = smpAgentCfg = defaultSMPClientAgentConfig, apnsConfig = defaultAPNSPushClientConfig, inactiveClientExpiration = Nothing, + storeLogFile, + resubscribeDelay = 100000, -- 100ms caCertificateFile = caCrtFile, privateKeyFile = serverKeyFile, certificateFile = serverCrtFile diff --git a/simplexmq.cabal b/simplexmq.cabal index 318dd8dc9..5ba13f091 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -62,6 +62,7 @@ library Simplex.Messaging.Notifications.Server.Env Simplex.Messaging.Notifications.Server.Push.APNS Simplex.Messaging.Notifications.Server.Store + Simplex.Messaging.Notifications.Server.StoreLog Simplex.Messaging.Notifications.Transport Simplex.Messaging.Parsers Simplex.Messaging.Protocol diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index c0f661580..e23cf3f09 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -180,7 +180,7 @@ newAgentClient InitialAgentServers {smp, ntf} agentEnv = do getMsgLocks <- TM.empty reconnections <- newTVar [] asyncClients <- newTVar [] - clientId <- stateTVar (clientCounter agentEnv) $ \i -> (i + 1, i + 1) + clientId <- stateTVar (clientCounter agentEnv) $ \i -> let i' = i + 1 in (i', i') lock <- newTMVar () return AgentClient {active, rcvQ, subQ, msgQ, smpServers, smpClients, ntfServers, ntfClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, rcvNetworkOp, msgDeliveryOp, sndNetworkOp, databaseOp, agentState, getMsgLocks, reconnections, asyncClients, clientId, agentEnv, lock} diff --git a/src/Simplex/Messaging/Crypto.hs b/src/Simplex/Messaging/Crypto.hs index 0f0494606..81dc8e6d5 100644 --- a/src/Simplex/Messaging/Crypto.hs +++ b/src/Simplex/Messaging/Crypto.hs @@ -250,6 +250,12 @@ deriving instance Eq (PrivateKey a) deriving instance Show (PrivateKey a) +instance StrEncoding (PrivateKey X25519) where + strEncode = strEncode . encodePrivKey + {-# INLINE strEncode #-} + strDecode = decodePrivKey + {-# INLINE strDecode #-} + data APrivateKey = forall a. AlgorithmI a => @@ -296,6 +302,12 @@ instance Encoding APrivateSignKey where smpDecode = decodePrivKey {-# INLINE smpDecode #-} +instance StrEncoding APrivateSignKey where + strEncode = strEncode . encodePrivKey + {-# INLINE strEncode #-} + strDecode = decodePrivKey + {-# INLINE strDecode #-} + data APublicVerifyKey = forall a. (AlgorithmI a, SignatureAlgorithm a) => diff --git a/src/Simplex/Messaging/Notifications/Protocol.hs b/src/Simplex/Messaging/Notifications/Protocol.hs index b9deb45d2..789a11136 100644 --- a/src/Simplex/Messaging/Notifications/Protocol.hs +++ b/src/Simplex/Messaging/Notifications/Protocol.hs @@ -423,6 +423,10 @@ instance Encoding NtfSubStatus where "SMP_AUTH" -> pure NSSMPAuth _ -> fail "bad NtfSubStatus" +instance StrEncoding NtfSubStatus where + strEncode = smpEncode + strP = smpP + data NtfTknStatus = -- | Token created in DB NTNew @@ -456,6 +460,10 @@ instance Encoding NtfTknStatus where "EXPIRED" -> pure NTExpired _ -> fail "bad NtfTknStatus" +instance StrEncoding NtfTknStatus where + strEncode = smpEncode + strP = smpP + instance FromField NtfTknStatus where fromField = fromTextField_ $ either (const Nothing) Just . smpDecode . encodeUtf8 instance ToField NtfTknStatus where toField = toField . decodeLatin1 . smpEncode diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index fbae87dc5..7a54d937b 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -10,6 +10,7 @@ module Simplex.Messaging.Notifications.Server where +import Control.Concurrent.STM (stateTVar) import Control.Logger.Simple import Control.Monad.Except import Control.Monad.IO.Unlift (MonadUnliftIO) @@ -17,6 +18,7 @@ import Control.Monad.Reader import Crypto.Random (MonadRandom) import Data.ByteString.Char8 (ByteString) import Data.Functor (($>)) +import Data.Map.Strict (Map) import qualified Data.Text as T import Data.Time.Clock.System (getSystemTime) import Network.Socket (ServiceName) @@ -26,6 +28,7 @@ import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Env import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..), PushNotification (..), PushProviderClient, PushProviderError (..)) import Simplex.Messaging.Notifications.Server.Store +import Simplex.Messaging.Notifications.Server.StoreLog import Simplex.Messaging.Notifications.Transport import Simplex.Messaging.Protocol (ErrorType (..), SMPServer, SignedTransmission, Transmission, encodeTransmission, tGet, tPut) import qualified Simplex.Messaging.Protocol as SMP @@ -34,8 +37,8 @@ import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ATransport (..), THandle (..), TProxy, Transport (..)) import Simplex.Messaging.Transport.Server (runTransportServer) import Simplex.Messaging.Util -import UnliftIO (async, uninterruptibleCancel) -import UnliftIO.Concurrent (forkFinally, threadDelay) +import UnliftIO (IOMode (..), async, uninterruptibleCancel) +import UnliftIO.Concurrent (forkFinally, forkIO, threadDelay) import UnliftIO.Exception import UnliftIO.STM @@ -51,7 +54,10 @@ ntfServer :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfServerConfi ntfServer NtfServerConfig {transports} started = do s <- asks subscriber ps <- asks pushServer + subs <- readTVarIO =<< asks (subscriptions . store) + void . forkIO $ resubscribe s subs raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports) + `finally` withNtfLog closeStoreLog where runServer :: (ServiceName, ATransport) -> m () runServer (tcpPort, ATransport t) = do @@ -65,6 +71,14 @@ ntfServer NtfServerConfig {transports} started = do Right th -> runNtfClientTransport th Left _ -> pure () +resubscribe :: (MonadUnliftIO m, MonadReader NtfEnv m) => NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> m () +resubscribe NtfSubscriber {newSubQ} subs = do + d <- asks $ resubscribeDelay . config + forM_ subs $ \sub -> do + atomically $ writeTBQueue newSubQ $ NtfSub sub + threadDelay d + liftIO $ logInfo "SMP connections resubscribed" + ntfSubscriber :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfSubscriber -> m () ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = do raceAny_ [subscribe, receiveSMP, receiveAgent] @@ -133,22 +147,25 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge -- update subscription status pure () -ntfPush :: forall m. MonadUnliftIO m => NtfPushServer -> m () -ntfPush s@NtfPushServer {pushQ} = liftIO . forever . runExceptT $ do - (tkn@NtfTknData {token = DeviceToken pp _, tknStatus}, ntf) <- atomically (readTBQueue pushQ) - logDebug $ "sending push notification to " <> T.pack (show pp) +ntfPush :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfPushServer -> m () +ntfPush s@NtfPushServer {pushQ} = forever $ do + (tkn@NtfTknData {ntfTknId, token = DeviceToken pp _, tknStatus}, ntf) <- atomically (readTBQueue pushQ) + liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp) status <- readTVarIO tknStatus case (status, ntf) of (_, PNVerification _) -> do -- TODO check token status - deliverNotification pp tkn ntf - atomically $ modifyTVar tknStatus $ \status' -> if status' == NTActive then NTActive else NTConfirmed + liftIO (runExceptT $ deliverNotification pp tkn ntf) >>= \case + Right _ -> do + status_ <- atomically $ stateTVar tknStatus $ \status' -> if status' == NTActive then (Nothing, NTActive) else (Just NTConfirmed, NTConfirmed) + forM_ status_ $ \status' -> withNtfLog $ \sl -> logTokenStatus sl ntfTknId status' + _ -> pure () (NTActive, PNCheckMessages) -> do - deliverNotification pp tkn ntf + liftIO . void . runExceptT $ deliverNotification pp tkn ntf (NTActive, PNMessage {}) -> do - deliverNotification pp tkn ntf + liftIO . void . runExceptT $ deliverNotification pp tkn ntf _ -> do - logError "bad notification token status" + liftIO $ logError "bad notification token status" where deliverNotification :: PushProvider -> PushProviderClient deliverNotification pp tkn ntf = do @@ -277,12 +294,12 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu let dhSecret = C.dh' dhPubKey srvDhPrivKey tknId <- getId regCode <- getRegCode - atomically $ do - tkn <- mkNtfTknData tknId newTkn ks dhSecret regCode - addNtfToken st tknId tkn - writeTBQueue pushQ (tkn, PNVerification regCode) + tkn <- atomically $ mkNtfTknData tknId newTkn ks dhSecret regCode + atomically $ addNtfToken st tknId tkn + atomically $ writeTBQueue pushQ (tkn, PNVerification regCode) + withNtfLog (`logCreateToken` tkn) pure (corrId, "", NRTknId tknId srvDhPubKey) - NtfReqCmd SToken (NtfTkn tkn@NtfTknData {ntfTknId, tknStatus, tknRegCode, tknDhSecret, tknDhKeys = (srvDhPubKey, srvDhPrivKey)}) (corrId, tknId, cmd) -> do + NtfReqCmd SToken (NtfTkn tkn@NtfTknData {ntfTknId, tknStatus, tknRegCode, tknDhSecret, tknDhKeys = (srvDhPubKey, srvDhPrivKey), tknCronInterval}) (corrId, tknId, cmd) -> do status <- readTVarIO tknStatus (corrId,tknId,) <$> case cmd of TNEW (NewNtfTkn _ _ dhPubKey) -> do @@ -301,6 +318,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu atomically $ writeTVar tknStatus NTActive tIds <- atomically $ removeInactiveTokenRegistrations st tkn forM_ tIds cancelInvervalNotifications + withNtfLog $ \s -> logTokenStatus s tknId NTActive pure NROk | otherwise -> do logDebug "TVFY - incorrect code or token status" @@ -315,6 +333,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu writeTVar tknStatus NTRegistered addNtfToken st tknId tkn {token = token', tknRegCode = regCode} writeTBQueue pushQ (tkn, PNVerification regCode) + withNtfLog $ \s -> logUpdateToken s tknId token' regCode pure NROk TDEL -> do logDebug "TDEL" @@ -323,21 +342,26 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu forM_ qs $ \SMPQueueNtf {smpServer, notifierId} -> atomically $ removeSubscription ca smpServer (SPNotifier, notifierId) cancelInvervalNotifications tknId + withNtfLog (`logDeleteToken` tknId) pure NROk TCRN 0 -> do logDebug "TCRN 0" + atomically $ writeTVar tknCronInterval 0 cancelInvervalNotifications tknId + withNtfLog $ \s -> logTokenCron s tknId 0 pure NROk TCRN int | int < 20 -> pure $ NRErr QUOTA | otherwise -> do logDebug "TCRN" + atomically $ writeTVar tknCronInterval int atomically (TM.lookup tknId intervalNotifiers) >>= \case Nothing -> runIntervalNotifier int Just IntervalNotifier {interval, action} -> unless (interval == int) $ do uninterruptibleCancel action runIntervalNotifier int + withNtfLog $ \s -> logTokenCron s tknId int pure NROk where runIntervalNotifier interval = do @@ -352,13 +376,13 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu logDebug "SNEW - new subscription" st <- asks store subId <- getId - atomically $ do - sub <- mkNtfSubData newSub - (corrId,"",) - <$> ( addNtfSubscription st subId sub >>= \case - Just _ -> writeTBQueue newSubQ (NtfSub sub) $> NRSubId subId - _ -> pure $ NRErr AUTH - ) + sub <- atomically $ mkNtfSubData subId newSub + resp <- + atomically (addNtfSubscription st subId sub) >>= \case + Just _ -> atomically (writeTBQueue newSubQ $ NtfSub sub) $> NRSubId subId + _ -> pure $ NRErr AUTH + withNtfLog (`logCreateSubscription` sub) + pure (corrId, "", resp) NtfReqCmd SSubscription (NtfSub NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, notifierKey = registeredNKey, subStatus}) (corrId, subId, cmd) -> do status <- readTVarIO subStatus (corrId,subId,) <$> case cmd of @@ -375,6 +399,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu st <- asks store atomically $ deleteNtfSubscription st subId atomically $ removeSubscription ca smpServer (SPNotifier, notifierId) + withNtfLog (`logDeleteSubscription` subId) pure NROk PING -> pure NRPong getId :: m NtfEntityId @@ -390,44 +415,5 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu atomically (TM.lookupDelete tknId intervalNotifiers) >>= mapM_ (uninterruptibleCancel . action) --- NReqCreate corrId tokenId smpQueue -> pure (corrId, "", NROk) --- do --- st <- asks store --- (pubDhKey, privDhKey) <- liftIO C.generateKeyPair' --- let dhSecret = C.dh' dhPubKey privDhKey --- sub <- atomically $ mkNtfSubsciption smpQueue token verifyKey dhSecret --- addSubRetry 3 st sub >>= \case --- Nothing -> pure (corrId, "", NRErr INTERNAL) --- Just sId -> do --- atomically $ writeTBQueue subQ sub --- pure (corrId, sId, NRSubId pubDhKey) --- where --- addSubRetry :: Int -> NtfSubscriptionsStore -> NtfSubsciption -> m (Maybe NtfSubsciptionId) --- addSubRetry 0 _ _ = pure Nothing --- addSubRetry n st sub = do --- sId <- getId --- -- create QueueRec record with these ids and keys --- atomically (addNtfSub st sId sub) >>= \case --- Nothing -> addSubRetry (n - 1) st sub --- _ -> pure $ Just sId --- getId :: m NtfSubsciptionId --- getId = do --- n <- asks $ subIdBytes . config --- gVar <- asks idsDrg --- atomically (randomBytes n gVar) --- NReqCommand sub@NtfSubsciption {tokenId, subStatus} (corrId, subId, cmd) -> --- (corrId,subId,) <$> case cmd of --- NCSubCreate tokenId smpQueue -> pure NROk --- do --- st <- asks store --- (pubDhKey, privDhKey) <- liftIO C.generateKeyPair' --- let dhSecret = C.dh' (dhPubKey newSub) privDhKey --- atomically (updateNtfSub st sub newSub dhSecret) >>= \case --- Nothing -> pure $ NRErr INTERNAL --- _ -> atomically $ do --- whenM ((== NSEnd) <$> readTVar status) $ writeTBQueue subQ sub --- pure $ NRSubId pubDhKey --- NCSubCheck -> NRStat <$> readTVarIO subStatus --- NCSubDelete -> do --- st <- asks store --- atomically (deleteNtfSub st subId) $> NROk +withNtfLog :: (MonadUnliftIO m, MonadReader NtfEnv m) => (StoreLog 'WriteMode -> IO a) -> m () +withNtfLog action = liftIO . mapM_ action =<< asks storeLog diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index d0b9770b4..f1f59ad06 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -22,12 +22,14 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Push.APNS import Simplex.Messaging.Notifications.Server.Store +import Simplex.Messaging.Notifications.Server.StoreLog import Simplex.Messaging.Protocol (CorrId, SMPServer, Transmission) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Transport (ATransport) import Simplex.Messaging.Transport.Server (loadFingerprint, loadTLSServerParams) +import System.IO (IOMode (..)) import UnliftIO.STM data NtfServerConfig = NtfServerConfig @@ -40,6 +42,8 @@ data NtfServerConfig = NtfServerConfig smpAgentCfg :: SMPClientAgentConfig, apnsConfig :: APNSPushClientConfig, inactiveClientExpiration :: Maybe ExpirationConfig, + storeLogFile :: Maybe FilePath, + resubscribeDelay :: Int, -- microseconds -- CA certificate private key is not needed for initialization caCertificateFile :: FilePath, privateKeyFile :: FilePath, @@ -58,6 +62,7 @@ data NtfEnv = NtfEnv subscriber :: NtfSubscriber, pushServer :: NtfPushServer, store :: NtfStore, + storeLog :: Maybe (StoreLog 'WriteMode), idsDrg :: TVar ChaChaDRG, serverIdentity :: C.KeyHash, tlsServerParams :: T.ServerParams, @@ -65,14 +70,15 @@ data NtfEnv = NtfEnv } newNtfServerEnv :: (MonadUnliftIO m, MonadRandom m) => NtfServerConfig -> m NtfEnv -newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsConfig, caCertificateFile, certificateFile, privateKeyFile} = do +newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsConfig, storeLogFile, caCertificateFile, certificateFile, privateKeyFile} = do idsDrg <- newTVarIO =<< drgNew store <- atomically newNtfStore + storeLog <- liftIO $ mapM (`readWriteNtfStore` store) storeLogFile subscriber <- atomically $ newNtfSubscriber subQSize smpAgentCfg pushServer <- atomically $ newNtfPushServer pushQSize apnsConfig tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile Fingerprint fp <- liftIO $ loadFingerprint caCertificateFile - pure NtfEnv {config, subscriber, pushServer, store, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp} + pure NtfEnv {config, subscriber, pushServer, store, storeLog, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp} data NtfSubscriber = NtfSubscriber { smpSubscribers :: TMap SMPServer SMPSubscriber, diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index ca029f8da..8e6d1ed0c 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -10,6 +10,7 @@ module Simplex.Messaging.Notifications.Server.Push.APNS where +import Control.Exception (Exception) import Control.Logger.Simple import Control.Monad.Except import Crypto.Hash.Algorithms (SHA256 (..)) @@ -320,7 +321,7 @@ data PushProviderError | PPTokenInvalid | PPRetryLater | PPPermanentError - deriving (Show) + deriving (Show, Exception) type PushProviderClient = NtfTknData -> PushNotification -> ExceptT PushProviderError IO () diff --git a/src/Simplex/Messaging/Notifications/Server/Store.hs b/src/Simplex/Messaging/Notifications/Server/Store.hs index 2d063d93a..38ef6cc72 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store.hs @@ -17,6 +17,7 @@ import qualified Data.Map.Strict as M import Data.Maybe (catMaybes) import Data.Set (Set) import qualified Data.Set as S +import Data.Word (Word16) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Protocol (NtfPrivateSignKey) @@ -49,26 +50,19 @@ data NtfTknData = NtfTknData tknVerifyKey :: C.APublicVerifyKey, tknDhKeys :: C.KeyPair 'C.X25519, tknDhSecret :: C.DhSecretX25519, - tknRegCode :: NtfRegCode + tknRegCode :: NtfRegCode, + tknCronInterval :: TVar Word16 } mkNtfTknData :: NtfTokenId -> NewNtfEntity 'Token -> C.KeyPair 'C.X25519 -> C.DhSecretX25519 -> NtfRegCode -> STM NtfTknData mkNtfTknData ntfTknId (NewNtfTkn token tknVerifyKey _) tknDhKeys tknDhSecret tknRegCode = do tknStatus <- newTVar NTRegistered - pure NtfTknData {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode} - --- data NtfSubscriptionsStore = NtfSubscriptionsStore - --- { subscriptions :: TMap NtfSubsciptionId NtfSubsciption, --- activeSubscriptions :: TMap (SMPServer, NotifierId) NtfSubsciptionId --- } --- do --- subscriptions <- newTVar M.empty --- activeSubscriptions <- newTVar M.empty --- pure NtfSubscriptionsStore {subscriptions, activeSubscriptions} + tknCronInterval <- newTVar 0 + pure NtfTknData {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval} data NtfSubData = NtfSubData - { smpQueue :: SMPQueueNtf, + { ntfSubId :: NtfSubscriptionId, + smpQueue :: SMPQueueNtf, notifierKey :: NtfPrivateSignKey, tokenId :: NtfTokenId, subStatus :: TVar NtfSubStatus @@ -175,10 +169,10 @@ getActiveNtfToken st tknId = tStatus <- readTVar tknStatus pure $ if tStatus == NTActive then Just tkn else Nothing -mkNtfSubData :: NewNtfEntity 'Subscription -> STM NtfSubData -mkNtfSubData (NewNtfSub tokenId smpQueue notifierKey) = do +mkNtfSubData :: NtfSubscriptionId -> NewNtfEntity 'Subscription -> STM NtfSubData +mkNtfSubData ntfSubId (NewNtfSub tokenId smpQueue notifierKey) = do subStatus <- newTVar NSNew - pure NtfSubData {smpQueue, tokenId, subStatus, notifierKey} + pure NtfSubData {ntfSubId, smpQueue, tokenId, subStatus, notifierKey} addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM (Maybe ()) addNtfSubscription st subId sub@NtfSubData {smpQueue, tokenId} = diff --git a/src/Simplex/Messaging/Notifications/Server/StoreLog.hs b/src/Simplex/Messaging/Notifications/Server/StoreLog.hs new file mode 100644 index 000000000..7ad0b2fe6 --- /dev/null +++ b/src/Simplex/Messaging/Notifications/Server/StoreLog.hs @@ -0,0 +1,227 @@ +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE GADTs #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} + +module Simplex.Messaging.Notifications.Server.StoreLog + ( StoreLog, + NtfStoreLogRecord (..), + readWriteNtfStore, + logCreateToken, + logTokenStatus, + logUpdateToken, + logTokenCron, + logDeleteToken, + logCreateSubscription, + logSubscriptionStatus, + logDeleteSubscription, + closeStoreLog, + ) +where + +import Control.Concurrent.STM +import Control.Monad (void) +import qualified Data.Attoparsec.ByteString.Char8 as A +import qualified Data.ByteString.Char8 as B +import Data.Word (Word16) +import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Encoding.String +import Simplex.Messaging.Notifications.Protocol +import Simplex.Messaging.Notifications.Server.Store +import Simplex.Messaging.Protocol (NtfPrivateSignKey) +import Simplex.Messaging.Server.StoreLog +import Simplex.Messaging.Util (whenM) +import System.Directory (doesFileExist, renameFile) +import System.IO + +data NtfStoreLogRecord + = CreateToken NtfTknRec + | TokenStatus NtfTokenId NtfTknStatus + | UpdateToken NtfTokenId DeviceToken NtfRegCode + | TokenCron NtfTokenId Word16 + | DeleteToken NtfTokenId + | CreateSubscription NtfSubRec + | SubscriptionStatus NtfSubscriptionId NtfSubStatus + | DeleteSubscription NtfSubscriptionId + +data NtfTknRec = NtfTknRec + { ntfTknId :: NtfTokenId, + token :: DeviceToken, + tknStatus :: NtfTknStatus, + tknVerifyKey :: C.APublicVerifyKey, + tknDhKeys :: C.KeyPair 'C.X25519, + tknDhSecret :: C.DhSecretX25519, + tknRegCode :: NtfRegCode, + tknCronInterval :: Word16 + } + +mkTknData :: NtfTknRec -> STM NtfTknData +mkTknData NtfTknRec {ntfTknId, token, tknStatus = status, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval = cronInt} = do + tknStatus <- newTVar status + tknCronInterval <- newTVar cronInt + pure NtfTknData {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval} + +mkTknRec :: NtfTknData -> STM NtfTknRec +mkTknRec NtfTknData {ntfTknId, token, tknStatus = status, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval = cronInt} = do + tknStatus <- readTVar status + tknCronInterval <- readTVar cronInt + pure NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval} + +data NtfSubRec = NtfSubRec + { ntfSubId :: NtfSubscriptionId, + smpQueue :: SMPQueueNtf, + notifierKey :: NtfPrivateSignKey, + tokenId :: NtfTokenId, + subStatus :: NtfSubStatus + } + +mkSubData :: NtfSubRec -> STM NtfSubData +mkSubData NtfSubRec {ntfSubId, smpQueue, notifierKey, tokenId, subStatus = status} = do + subStatus <- newTVar status + pure NtfSubData {ntfSubId, smpQueue, notifierKey, tokenId, subStatus} + +mkSubRec :: NtfSubData -> STM NtfSubRec +mkSubRec NtfSubData {ntfSubId, smpQueue, notifierKey, tokenId, subStatus = status} = do + subStatus <- readTVar status + pure NtfSubRec {ntfSubId, smpQueue, notifierKey, tokenId, subStatus} + +instance StrEncoding NtfStoreLogRecord where + strEncode = \case + CreateToken tknRec -> strEncode (Str "TCREATE", tknRec) + TokenStatus tknId tknStatus -> strEncode (Str "TSTATUS", tknId, tknStatus) + UpdateToken tknId token regCode -> strEncode (Str "TUPDATE", tknId, token, regCode) + TokenCron tknId cronInt -> strEncode (Str "TCRON", tknId, cronInt) + DeleteToken tknId -> strEncode (Str "TDELETE", tknId) + CreateSubscription subRec -> strEncode (Str "SCREATE", subRec) + SubscriptionStatus subId subStatus -> strEncode (Str "SSTATUS", subId, subStatus) + DeleteSubscription subId -> strEncode (Str "SDELETE", subId) + strP = + A.choice + [ "TCREATE " *> (CreateToken <$> strP), + "TSTATUS " *> (TokenStatus <$> strP_ <*> strP), + "TUPDATE " *> (UpdateToken <$> strP_ <*> strP_ <*> strP), + "TCRON " *> (TokenCron <$> strP_ <*> strP), + "TDELETE " *> (DeleteToken <$> strP), + "SCREATE " *> (CreateSubscription <$> strP), + "SSTATUS " *> (SubscriptionStatus <$> strP_ <*> strP), + "SDELETE " *> (DeleteSubscription <$> strP) + ] + +instance StrEncoding NtfTknRec where + strEncode NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval} = + B.unwords + [ "tknId=" <> strEncode ntfTknId, + "token=" <> strEncode token, + "tokenStatus=" <> strEncode tknStatus, + "verifyKey=" <> strEncode tknVerifyKey, + "dhKeys=" <> strEncode tknDhKeys, + "dhSecret=" <> strEncode tknDhSecret, + "regCode=" <> strEncode tknRegCode, + "cron=" <> strEncode tknCronInterval + ] + strP = do + ntfTknId <- "tknId=" *> strP_ + token <- "token=" *> strP_ + tknStatus <- "tokenStatus=" *> strP_ + tknVerifyKey <- "verifyKey=" *> strP_ + tknDhKeys <- "dhKeys=" *> strP_ + tknDhSecret <- "dhSecret=" *> strP_ + tknRegCode <- "regCode=" *> strP_ + tknCronInterval <- "cron=" *> strP + pure NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval} + +instance StrEncoding NtfSubRec where + strEncode NtfSubRec {ntfSubId, smpQueue, notifierKey, tokenId, subStatus} = + B.unwords + [ "subId=" <> strEncode ntfSubId, + "smpQueue=" <> strEncode smpQueue, + "notifierKey=" <> strEncode notifierKey, + "tknId=" <> strEncode tokenId, + "subStatus=" <> strEncode subStatus + ] + strP = do + ntfSubId <- "subId=" *> strP_ + smpQueue <- "smpQueue=" *> strP_ + notifierKey <- "notifierKey=" *> strP_ + tokenId <- "tknId=" *> strP_ + subStatus <- "subStatus=" *> strP + pure NtfSubRec {ntfSubId, smpQueue, notifierKey, tokenId, subStatus} + +logNtfStoreRecord :: StoreLog 'WriteMode -> NtfStoreLogRecord -> IO () +logNtfStoreRecord = writeStoreLogRecord + +logCreateToken :: StoreLog 'WriteMode -> NtfTknData -> IO () +logCreateToken s tkn = logNtfStoreRecord s . CreateToken =<< atomically (mkTknRec tkn) + +logTokenStatus :: StoreLog 'WriteMode -> NtfTokenId -> NtfTknStatus -> IO () +logTokenStatus s tknId tknStatus = logNtfStoreRecord s $ TokenStatus tknId tknStatus + +logUpdateToken :: StoreLog 'WriteMode -> NtfTokenId -> DeviceToken -> NtfRegCode -> IO () +logUpdateToken s tknId token regCode = logNtfStoreRecord s $ UpdateToken tknId token regCode + +logTokenCron :: StoreLog 'WriteMode -> NtfTokenId -> Word16 -> IO () +logTokenCron s tknId cronInt = logNtfStoreRecord s $ TokenCron tknId cronInt + +logDeleteToken :: StoreLog 'WriteMode -> NtfTokenId -> IO () +logDeleteToken s tknId = logNtfStoreRecord s $ DeleteToken tknId + +logCreateSubscription :: StoreLog 'WriteMode -> NtfSubData -> IO () +logCreateSubscription s sub = logNtfStoreRecord s . CreateSubscription =<< atomically (mkSubRec sub) + +logSubscriptionStatus :: StoreLog 'WriteMode -> NtfSubscriptionId -> NtfSubStatus -> IO () +logSubscriptionStatus s subId subStatus = logNtfStoreRecord s $ SubscriptionStatus subId subStatus + +logDeleteSubscription :: StoreLog 'WriteMode -> NtfSubscriptionId -> IO () +logDeleteSubscription s subId = logNtfStoreRecord s $ DeleteSubscription subId + +readWriteNtfStore :: FilePath -> NtfStore -> IO (StoreLog 'WriteMode) +readWriteNtfStore f st = do + whenM (doesFileExist f) $ do + readNtfStore f st + renameFile f $ f <> ".bak" + s <- openWriteStoreLog f + writeNtfStore s st + pure s + +readNtfStore :: FilePath -> NtfStore -> IO () +readNtfStore f st = mapM_ addNtfLogRecord . B.lines =<< B.readFile f + where + addNtfLogRecord s = case strDecode s of + Left e -> B.putStrLn $ "Log parsing error (" <> B.pack e <> "): " <> B.take 100 s + Right lr -> atomically $ case lr of + CreateToken r@NtfTknRec {ntfTknId} -> do + tkn <- mkTknData r + addNtfToken st ntfTknId tkn + TokenStatus tknId status -> + getNtfToken st tknId + >>= mapM_ (\NtfTknData {tknStatus} -> writeTVar tknStatus status) + UpdateToken tknId token' tknRegCode -> + getNtfToken st tknId + >>= mapM_ + ( \tkn@NtfTknData {tknStatus} -> do + removeTokenRegistration st tkn + writeTVar tknStatus NTRegistered + addNtfToken st tknId tkn {token = token', tknRegCode} + ) + TokenCron tknId cronInt -> + getNtfToken st tknId + >>= mapM_ (\NtfTknData {tknCronInterval} -> writeTVar tknCronInterval cronInt) + DeleteToken tknId -> + void $ deleteNtfToken st tknId + CreateSubscription r@NtfSubRec {ntfSubId} -> do + sub <- mkSubData r + void $ addNtfSubscription st ntfSubId sub + SubscriptionStatus subId status -> + getNtfSubscription st subId + >>= mapM_ (\NtfSubData {subStatus} -> writeTVar subStatus status) + DeleteSubscription subId -> + deleteNtfSubscription st subId + +writeNtfStore :: StoreLog 'WriteMode -> NtfStore -> IO () +writeNtfStore s NtfStore {tokens, subscriptions} = do + atomically (readTVar tokens >>= mapM mkTknRec) + >>= mapM_ (writeStoreLogRecord s . CreateToken) + atomically (readTVar subscriptions >>= mapM mkSubRec) + >>= mapM_ (writeStoreLogRecord s . CreateSubscription) diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 49ec62c1f..3d8f9625d 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -13,6 +13,7 @@ module Simplex.Messaging.Server.StoreLog openReadStoreLog, storeLogFilePath, closeStoreLog, + writeStoreLogRecord, logCreateQueue, logSecureQueue, logAddNotifier, @@ -91,7 +92,10 @@ instance StrEncoding StoreLogRecord where <|> "NDELETE" *> (DeleteNotifier <$> strP) openWriteStoreLog :: FilePath -> IO (StoreLog 'WriteMode) -openWriteStoreLog f = WriteStoreLog f <$> openFile f WriteMode +openWriteStoreLog f = do + h <- openFile f WriteMode + hSetBuffering h LineBuffering + pure $ WriteStoreLog f h openReadStoreLog :: FilePath -> IO (StoreLog 'ReadMode) openReadStoreLog f = do @@ -108,7 +112,7 @@ closeStoreLog = \case WriteStoreLog _ h -> hClose h ReadStoreLog _ h -> hClose h -writeStoreLogRecord :: StoreLog 'WriteMode -> StoreLogRecord -> IO () +writeStoreLogRecord :: StrEncoding r => StoreLog 'WriteMode -> r -> IO () writeStoreLogRecord (WriteStoreLog _ h) r = do B.hPutStrLn h $ strEncode r hFlush h diff --git a/src/Simplex/Messaging/Transport/Server.hs b/src/Simplex/Messaging/Transport/Server.hs index 2920b1d9b..84d04e68a 100644 --- a/src/Simplex/Messaging/Transport/Server.hs +++ b/src/Simplex/Messaging/Transport/Server.hs @@ -54,7 +54,7 @@ runTCPServer started port server = do (closeServer started clients) $ \sock -> forever . E.bracketOnError (accept sock) (close . fst) $ \(conn, _peer) -> do -- catchAll_ is needed here in case the connection was closed earlier - cId <- atomically $ stateTVar clientId $ \cId -> (cId + 1, cId + 1) + cId <- atomically $ stateTVar clientId $ \cId -> let cId' = cId + 1 in (cId', cId') let closeConn _ = atomically (TM.delete cId clients) >> gracefulClose conn 5000 `catchAll_` pure () tId <- mkWeakThreadId =<< server conn `forkFinally` closeConn atomically $ TM.insert cId tId clients diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 0882c7e1b..1e9aafad5 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -8,6 +8,7 @@ module AgentTests.FunctionalAPITests ( functionalAPITests, + makeConnection, get, (##>), (=##>), diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 5213bc67a..48c68be52 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -9,8 +9,8 @@ module AgentTests.NotificationTests where -- import Control.Logger.Simple (LogConfig (..), LogLevel (..), setLogLevel, withGlobalLogging) -import AgentTests.FunctionalAPITests (get, (##>), (=##>), pattern Msg) -import Control.Concurrent (threadDelay) +import AgentTests.FunctionalAPITests (get, makeConnection, (##>), (=##>), pattern Msg) +import Control.Concurrent (killThread, threadDelay) import Control.Monad.Except import qualified Data.Aeson as J import qualified Data.Aeson.Types as JT @@ -69,6 +69,11 @@ notificationTests t = withSmpServer t $ withAPNSMockServer $ \apns -> withNtfServer t $ testChangeNotificationsMode apns + describe "Notifications server store log" $ do + it "should save and restore tokens and subscriptions" $ \_ -> + withSmpServer t $ + withAPNSMockServer $ \apns -> + testNotificationsStoreLog t apns testNotificationToken :: APNSMockServer -> IO () testNotificationToken APNSMockServer {apnsQ} = do @@ -253,25 +258,9 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers Right () <- runExceptT $ do -- alice registers notification token - let aliceTkn = DeviceToken PPApnsTest "abcd" - NTRegistered <- registerNtfToken alice aliceTkn NMInstant - APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData}, sendApnsResponse} <- - atomically $ readTBQueue apnsQ - verification <- ntfData .-> "verification" - nonce <- C.cbNonce <$> ntfData .-> "nonce" - liftIO $ sendApnsResponse APNSRespOk - verifyNtfToken alice aliceTkn nonce verification - NTActive <- checkNtfToken alice aliceTkn + _ <- registerTestToken alice "abcd" NMInstant apnsQ -- bob registers notification token - let bobTkn = DeviceToken PPApnsTest "bcde" - NTRegistered <- registerNtfToken bob bobTkn NMInstant - APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData'}, sendApnsResponse = sendApnsResponse'} <- - atomically $ readTBQueue apnsQ - verification' <- ntfData' .-> "verification" - nonce' <- C.cbNonce <$> ntfData' .-> "nonce" - liftIO $ sendApnsResponse' APNSRespOk - verifyNtfToken bob bobTkn nonce' verification' - NTActive <- checkNtfToken bob bobTkn + _ <- registerTestToken bob "bcde" NMInstant apnsQ -- establish connection liftIO $ threadDelay 50000 (bobId, qInfo) <- createConnection alice SCMInvitation @@ -308,6 +297,19 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do baseId = 3 msgId = subtract baseId +registerTestToken :: AgentClient -> ByteString -> NotificationsMode -> TBQueue APNSMockRequest -> ExceptT AgentErrorType IO DeviceToken +registerTestToken a token mode apnsQ = do + let tkn = DeviceToken PPApnsTest token + NTRegistered <- registerNtfToken a tkn mode + APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData'}, sendApnsResponse = sendApnsResponse'} <- + atomically $ readTBQueue apnsQ + verification' <- ntfData' .-> "verification" + nonce' <- C.cbNonce <$> ntfData' .-> "nonce" + liftIO $ sendApnsResponse' APNSRespOk + verifyNtfToken a tkn nonce' verification' + NTActive <- checkNtfToken a tkn + pure tkn + testChangeNotificationsMode :: APNSMockServer -> IO () testChangeNotificationsMode APNSMockServer {apnsQ} = do alice <- getSMPAgentClient agentCfg initAgentServers @@ -322,15 +324,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do get alice ##> ("", bobId, CON) get bob ##> ("", aliceId, CON) -- register notification token, set mode to NMPeriodic - let tkn = DeviceToken PPApnsTest "abcd" - NTRegistered <- registerNtfToken alice tkn NMPeriodic - APNSMockRequest {notification = APNSNotification {aps = APNSBackground _, notificationData = Just ntfData}, sendApnsResponse} <- - atomically $ readTBQueue apnsQ - verification <- ntfData .-> "verification" - vNonce <- C.cbNonce <$> ntfData .-> "nonce" - liftIO $ sendApnsResponse APNSRespOk - verifyNtfToken alice tkn vNonce verification - NTActive <- checkNtfToken alice tkn + tkn <- registerTestToken alice "abcd" NMPeriodic apnsQ -- send message, no notification 1 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello" get bob ##> ("", aliceId, SENT $ baseId + 1) @@ -362,6 +356,33 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do baseId = 3 msgId = subtract baseId +testNotificationsStoreLog :: ATransport -> APNSMockServer -> IO () +testNotificationsStoreLog t APNSMockServer {apnsQ} = do + alice <- getSMPAgentClient agentCfg initAgentServers + bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers + Right (aliceId, bobId) <- withNtfServerStoreLog t $ \threadId -> runExceptT $ do + _ <- registerTestToken alice "abcd" NMInstant apnsQ + (aliceId, bobId) <- makeConnection alice bob + liftIO $ threadDelay 250000 + 4 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello" + get bob ##> ("", aliceId, SENT 4) + void $ messageNotification apnsQ + get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False + ackMessage alice bobId 4 + liftIO $ killThread threadId + pure (aliceId, bobId) + + liftIO $ threadDelay 250000 + + Right () <- withNtfServerStoreLog t $ \threadId -> runExceptT $ do + liftIO $ threadDelay 250000 + 5 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello again" + get bob ##> ("", aliceId, SENT 5) + void $ messageNotification apnsQ + get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False + liftIO $ killThread threadId + pure () + messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString) messageNotification apnsQ = do 500000 `timeout` atomically (readTBQueue apnsQ) >>= \case diff --git a/tests/NtfClient.hs b/tests/NtfClient.hs index caa7e16fc..86ab111e6 100644 --- a/tests/NtfClient.hs +++ b/tests/NtfClient.hs @@ -62,6 +62,9 @@ apnsTestPort = "6010" testKeyHash :: C.KeyHash testKeyHash = "LcJUMfVhwD8yxjAiSaDzzGF3-kLG4Uh0Fl_ZIjrRwjI=" +ntfTestStoreLogFile :: FilePath +ntfTestStoreLogFile = "tests/tmp/ntf-server-store.log" + testNtfClient :: (Transport c, MonadUnliftIO m) => (THandle c -> m a) -> m a testNtfClient client = runTransportClient testHost ntfTestPort (Just testKeyHash) (Just defaultKeepAliveOpts) $ \h -> @@ -85,16 +88,24 @@ ntfServerCfg = http2cfg = defaultHTTP2ClientConfig {caStoreFile = "tests/fixtures/ca.crt"} }, inactiveClientExpiration = Just defaultInactiveClientExpiration, + storeLogFile = Nothing, + resubscribeDelay = 1000, -- CA certificate private key is not needed for initialization caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", certificateFile = "tests/fixtures/server.crt" } +withNtfServerStoreLog :: (MonadUnliftIO m, MonadRandom m) => ATransport -> (ThreadId -> m a) -> m a +withNtfServerStoreLog t = withNtfServerCfg t ntfServerCfg {storeLogFile = Just ntfTestStoreLogFile} + withNtfServerThreadOn :: (MonadUnliftIO m, MonadRandom m) => ATransport -> ServiceName -> (ThreadId -> m a) -> m a -withNtfServerThreadOn t port' = +withNtfServerThreadOn t port' = withNtfServerCfg t ntfServerCfg {transports = [(port', t)]} + +withNtfServerCfg :: (MonadUnliftIO m, MonadRandom m) => ATransport -> NtfServerConfig -> (ThreadId -> m a) -> m a +withNtfServerCfg t cfg = serverBracket - (\started -> runNtfServerBlocking started ntfServerCfg {transports = [(port', t)]}) + (\started -> runNtfServerBlocking started cfg {transports = [(ntfTestPort, t)]}) (pure ()) serverBracket :: MonadUnliftIO m => (TMVar Bool -> m ()) -> m () -> (ThreadId -> m a) -> m a