diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 1a3565f2f..7417b823a 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -778,7 +778,7 @@ newRcvConnSrv c userId connId enableNtfs cMode clientData pqInitKeys subMode srv lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId when enableNtfs $ do ns <- asks ntfSupervisor - atomically $ sendNtfSubCommand ns (connId, NSCCreate) + atomically $ sendNtfSubCommand ns (NSCCreate, [connId]) let crData = ConnReqUriData SSSimplex smpAgentVRange [qUri] clientData case cMode of SCMContact -> pure (connId, CRContactUri crData) @@ -923,7 +923,7 @@ createReplyQueue c ConnData {userId, connId, enableNtfs} SndQueue {smpClientVers lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId when enableNtfs $ do ns <- asks ntfSupervisor - atomically $ sendNtfSubCommand ns (connId, NSCCreate) + atomically $ sendNtfSubCommand ns (NSCCreate, [connId]) pure qInfo -- | Approve confirmation (LET command) in Reader monad @@ -1013,14 +1013,13 @@ subscribeConnections' c connIds = do order _ = 4 sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType ()) -> Map ConnId SomeConn -> AM' () sendNtfCreate ns rcvRs cs = do - -- TODO this needs to be batched end to end. - -- Currently, the only change is to ignore failed subscriptions. let oks = M.keysSet $ M.filter (either temporaryAgentError $ const True) rcvRs - forM_ (M.restrictKeys cs oks) $ \case - SomeConn _ conn -> do - let cmd = if enableNtfs $ toConnData conn then NSCCreate else NSCSmpDelete - ConnData {connId} = toConnData conn - atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd) + cs' = M.restrictKeys cs oks + (csCreate, csDelete) = M.partition (\(SomeConn _ conn) -> enableNtfs $ toConnData conn) cs' + sendNtfCmd NSCCreate csCreate + sendNtfCmd NSCSmpDelete csDelete + where + sendNtfCmd cmd cs' = forM_ (L.nonEmpty $ M.keys cs') $ \cids -> atomically $ writeTBQueue (ntfSubQ ns) (cmd, cids) resumeDelivery :: Map ConnId SomeConn -> AM () resumeDelivery conns = do conns' <- M.restrictKeys conns . S.fromList <$> withStore' c getConnectionsForDelivery @@ -1259,7 +1258,7 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do withStore' c $ \db -> deleteConnRcvQueue db rq' when (enableNtfs cData) $ do ns <- asks ntfSupervisor - atomically $ sendNtfSubCommand ns (connId, NSCCreate) + atomically $ sendNtfSubCommand ns (NSCCreate, [connId]) let conn' = DuplexConnection cData (rq'' :| rqs') sqs notify $ SWITCH QDRcv SPCompleted $ connectionStats conn' _ -> internalErr "ICQDelete: cannot delete the only queue in connection" @@ -1716,12 +1715,6 @@ connRcvQueues = \case SndConnection _ _ -> [] NewConnection _ -> [] -disableConn :: AgentClient -> ConnId -> AM' () -disableConn c connId = do - atomically $ removeSubscription c connId - ns <- asks ntfSupervisor - atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCDeleteSub) - -- Unlike deleteConnectionsAsync, this function does not mark connections as deleted in case of deletion failure. deleteConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType ())) deleteConnections' = deleteConnections_ getConns False False @@ -1748,7 +1741,7 @@ prepareDeleteConnections_ getConnections c waitDelivery connIds = do (delRs, rcvQs) = M.mapEither rcvQueues cs rqs = concat $ M.elems rcvQs connIds' = M.keys rcvQs - lift . forM_ connIds' $ disableConn c + lift $ forM_ (L.nonEmpty connIds') unsubConnIds -- ! delRs is not used to notify about the result in any of the calling functions, -- ! it is only used to check results count in deleteConnections_; -- ! if it was used to notify about the result, it might be necessary to differentiate @@ -1762,6 +1755,12 @@ prepareDeleteConnections_ getConnections c waitDelivery connIds = do rcvQueues (SomeConn _ conn) = case connRcvQueues conn of [] -> Left $ Right () rqs -> Right rqs + unsubConnIds :: NonEmpty ConnId -> AM' () + unsubConnIds connIds' = do + forM_ connIds' $ \connId -> + atomically $ removeSubscription c connId + ns <- asks ntfSupervisor + atomically $ writeTBQueue (ntfSubQ ns) (NSCDeleteSub, connIds') notify = atomically . writeTBQueue (subQ c) deleteConnQueues :: AgentClient -> Bool -> Bool -> [RcvQueue] -> AM' (Map ConnId (Either AgentErrorType ())) @@ -2018,7 +2017,7 @@ toggleConnectionNtfs' c connId enable = do withStore' c $ \db -> setConnectionNtfs db connId enable ns <- asks ntfSupervisor let cmd = if enable then NSCCreate else NSCSmpDelete - atomically $ sendNtfSubCommand ns (connId, cmd) + atomically $ sendNtfSubCommand ns (cmd, [connId]) deleteToken_ :: AgentClient -> NtfToken -> AM () deleteToken_ c@AgentClient {subQ} tkn@NtfToken {ntfTokenId, ntfTknStatus} = do @@ -2066,13 +2065,26 @@ deleteNtfSubs c deleteCmd = do sendNtfConnCommands :: AgentClient -> NtfSupervisorCommand -> AM () sendNtfConnCommands c cmd = do ns <- asks ntfSupervisor - connIds <- liftIO $ getSubscriptions c - forM_ connIds $ \connId -> do - withStore' c (`getConnData` connId) >>= \case - Just (ConnData {enableNtfs}, _) -> - when enableNtfs . atomically $ writeTBQueue (ntfSubQ ns) (connId, cmd) - _ -> - atomically $ writeTBQueue (subQ c) ("", connId, AEvt SAEConn $ ERR $ INTERNAL "no connection data") + connIds <- liftIO $ S.toList <$> getSubscriptions c + rs <- lift $ withStoreBatch' c (\db -> map (getConnData db) connIds) + let (connIds', errs) = enabledNtfConns (zip connIds rs) + forM_ (L.nonEmpty connIds') $ \connIds'' -> + atomically $ writeTBQueue (ntfSubQ ns) (cmd, connIds'') + -- TODO [batch ntf] notify ERRS + forM_ errs $ \(connId, e) -> + atomically $ writeTBQueue (subQ c) ("", connId, AEvt SAEConn $ ERR e) + where + enabledNtfConns :: [(ConnId, Either AgentErrorType (Maybe (ConnData, ConnectionMode)))] -> ([ConnId], [(ConnId, AgentErrorType)]) + enabledNtfConns = foldr addEnabledConn ([], []) + where + addEnabledConn :: + (ConnId, Either AgentErrorType (Maybe (ConnData, ConnectionMode))) -> + ([ConnId], [(ConnId, AgentErrorType)]) -> + ([ConnId], [(ConnId, AgentErrorType)]) + addEnabledConn cData_ (cIds, errs) = case cData_ of + (_, Right (Just (ConnData {connId, enableNtfs}, _))) -> if enableNtfs then (connId : cIds, errs) else (cIds, errs) + (connId, Right Nothing) -> (cIds, (connId, INTERNAL "no connection data") : errs) + (connId, Left e) -> (cIds, (connId, e) : errs) setNtfServers :: AgentClient -> [NtfServer] -> IO () setNtfServers c = atomically . writeTVar (ntfServers c) diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index c10ba91ca..0f179c70c 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -254,7 +254,7 @@ createAgentStore dbFilePath dbKey keepKey = createSQLiteStore dbFilePath dbKey k data NtfSupervisor = NtfSupervisor { ntfTkn :: TVar (Maybe NtfToken), - ntfSubQ :: TBQueue (ConnId, NtfSupervisorCommand), + ntfSubQ :: TBQueue (NtfSupervisorCommand, NonEmpty ConnId), ntfWorkers :: TMap NtfServer Worker, ntfSMPWorkers :: TMap SMPServer Worker } diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index fa6291a00..c45a3184a 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -2,9 +2,9 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedLists #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE TupleSections #-} {-# OPTIONS_GHC -fno-warn-ambiguous-fields #-} module Simplex.Messaging.Agent.NtfSubSupervisor @@ -23,21 +23,27 @@ import Control.Monad import Control.Monad.Reader import Control.Monad.Trans.Except import Data.Bifunctor (first) +import Data.Either (rights) +import Data.Foldable (foldr') +import Data.List.NonEmpty (NonEmpty (..)) +import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M +import qualified Data.Set as S import Data.Text (Text) import Data.Time (UTCTime, addUTCTime, getCurrentTime) import Data.Time.Clock (diffUTCTime) import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite -import Simplex.Messaging.Agent.Protocol (AEvent (..), AEvt (..), AgentErrorType (..), BrokerErrorType (..), ConnId, NotificationsMode (..), SAEntity (..)) +import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Stats import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite +import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol (NtfSubStatus (..), NtfTknStatus (..), SMPQueueNtf (..)) import Simplex.Messaging.Notifications.Types -import Simplex.Messaging.Protocol (NtfServer, SMPServer, sameSrvAddr) +import Simplex.Messaging.Protocol (NtfServer, sameSrvAddr) import Simplex.Messaging.Util (diffToMicroseconds, threadDelay', tshow, unlessM) import System.Random (randomR) import UnliftIO @@ -48,69 +54,110 @@ runNtfSupervisor :: AgentClient -> AM' () runNtfSupervisor c = do ns <- asks ntfSupervisor forever $ do - cmd@(connId, _) <- atomically . readTBQueue $ ntfSubQ ns - handleErr connId . agentOperationBracket c AONtfNetwork waitUntilActive $ - runExceptT (processNtfSub c cmd) >>= \case - Left e -> notifyErr connId e + cmd <- atomically . readTBQueue $ ntfSubQ ns + handleErr . agentOperationBracket c AONtfNetwork waitUntilActive $ + runExceptT (processNtfCmd c cmd) >>= \case + Left e -> notifyErr e Right _ -> return () where - handleErr :: ConnId -> AM' () -> AM' () - handleErr connId = E.handle $ \(e :: E.SomeException) -> do + handleErr :: AM' () -> AM' () + handleErr = E.handle $ \(e :: E.SomeException) -> do logError $ "runNtfSupervisor error " <> tshow e - notifyErr connId e - notifyErr connId e = notifyInternalError c connId $ "runNtfSupervisor error " <> show e + notifyErr e + notifyErr e = notifyInternalError' c $ "runNtfSupervisor error " <> show e -processNtfSub :: AgentClient -> (ConnId, NtfSupervisorCommand) -> AM () -processNtfSub c (connId, cmd) = do - logInfo $ "processNtfSub - connId = " <> tshow connId <> " - cmd = " <> tshow cmd +-- TODO [batch ntf] notify ERRS +processNtfCmd :: AgentClient -> (NtfSupervisorCommand, NonEmpty ConnId) -> AM () +processNtfCmd c (cmd, connIds) = do + logInfo $ "processNtfCmd - cmd = " <> tshow cmd case cmd of NSCCreate -> do - (a, RcvQueue {userId, server = smpServer, clientNtfCreds}) <- withStore c $ \db -> runExceptT $ do - a <- liftIO $ getNtfSubscription db connId - q <- ExceptT $ getPrimaryRcvQueue db connId - pure (a, q) - logInfo $ "processNtfSub, NSCCreate - a = " <> tshow a - case a of - Nothing -> do + rqSubActions <- lift $ rights <$> withStoreBatch c (\db -> map (getQueueSub db) (L.toList connIds)) + logInfo $ "processNtfCmd, NSCCreate - length rqSubs = " <> tshow (length rqSubActions) + let (ns, rs, css, cns) = partitionQueueSubActions rqSubActions + createNewSubs ns + resetSubs rs + lift $ do + mapM_ (getNtfSMPWorker True c) (S.fromList css) + mapM_ (getNtfNTFWorker True c) (S.fromList cns) + where + getQueueSub :: + DB.Connection -> + ConnId -> + IO (Either AgentErrorType (RcvQueue, Maybe NtfSupervisorSub)) + getQueueSub db connId = fmap (first storeError) $ runExceptT $ do + rq <- ExceptT $ getPrimaryRcvQueue db connId + sub <- liftIO $ getNtfSubscription db connId + pure (rq, sub) + createNewSubs :: [RcvQueue] -> AM () + createNewSubs rqs = do withTokenServer $ \ntfServer -> do - let newSub = newNtfSubscription userId connId smpServer Nothing ntfServer NASNew - withStore c $ \db -> createNtfSubscription db newSub $ NSASMP NSASmpKey - lift . void $ getNtfSMPWorker True c smpServer - (Just (sub@NtfSubscription {ntfServer = subNtfServer, smpServer = smpServer', ntfQueueId}, action_)) -> do - case (clientNtfCreds, ntfQueueId) of - (Just ClientNtfCreds {notifierId}, Just ntfQueueId') - | sameSrvAddr smpServer smpServer' && notifierId == ntfQueueId' -> create - | otherwise -> resetSubscription - (Nothing, Nothing) -> create - _ -> resetSubscription + let newSubs = map (rqToNewSub ntfServer) rqs + void $ lift $ withStoreBatch c (\db -> map (storeNewSub db) newSubs) + kickSMPWorkers rqs where - create :: AM () - create = case action_ of - -- action was set to NULL after worker internal error - Nothing -> resetSubscription - Just (action, _) - -- subscription was marked for deletion / is being deleted - | isDeleteNtfSubAction action -> resetSubscription - -- continue work on subscription (e.g. supervisor was repeatedly tasked with creating a subscription) - | otherwise -> case action of - NSANtf _ -> lift . void $ getNtfNTFWorker True c subNtfServer - NSASMP _ -> lift . void $ getNtfSMPWorker True c smpServer - resetSubscription :: AM () - resetSubscription = - withTokenServer $ \ntfServer -> do - let sub' = sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew} - withStore' c $ \db -> supervisorUpdateNtfSub db sub' (NSASMP NSASmpKey) - lift . void $ getNtfSMPWorker True c smpServer + rqToNewSub :: NtfServer -> RcvQueue -> NtfSubscription + rqToNewSub ntfServer RcvQueue {userId, connId, server} = newNtfSubscription userId connId server Nothing ntfServer NASNew + storeNewSub :: DB.Connection -> NtfSubscription -> IO (Either AgentErrorType ()) + storeNewSub db sub = first storeError <$> createNtfSubscription db sub (NSASMP NSASmpKey) + resetSubs :: [(RcvQueue, NtfSubscription)] -> AM () + resetSubs rqSubs = do + withTokenServer $ \ntfServer -> do + let subsToReset = map (toResetSub ntfServer) rqSubs + lift $ void $ withStoreBatch' c (\db -> map (\sub -> supervisorUpdateNtfSub db sub (NSASMP NSASmpKey)) subsToReset) + let rqs = map fst rqSubs + kickSMPWorkers rqs + where + toResetSub :: NtfServer -> (RcvQueue, NtfSubscription) -> NtfSubscription + toResetSub ntfServer (rq, sub) = + let RcvQueue {server = smpServer} = rq + in sub {smpServer, ntfQueueId = Nothing, ntfServer, ntfSubId = Nothing, ntfSubStatus = NASNew} + partitionQueueSubActions :: + [(RcvQueue, Maybe NtfSupervisorSub)] -> + ( [RcvQueue], -- new subs + [(RcvQueue, NtfSubscription)], -- reset subs + [SMPServer], -- continue work (SMP) + [NtfServer] -- continue work (Ntf) + ) + partitionQueueSubActions = foldr' decideSubWork ([], [], [], []) + where + -- sub = Nothing, needs to be created + decideSubWork (rq, Nothing) (ns, rs, css, cns) = (rq : ns, rs, css, cns) + decideSubWork (rq, Just (sub, subAction_)) (ns, rs, css, cns) = + case (clientNtfCreds rq, ntfQueueId sub) of + -- notifier ID created on SMP server (on ntf server subscription can be registered or not yet), + -- need to clarify action + (Just ClientNtfCreds {notifierId}, Just ntfQueueId') + | sameSrvAddr (qServer rq) subSMPServer && notifierId == ntfQueueId' -> contOrReset + | otherwise -> reset + (Nothing, Nothing) -> contOrReset + _ -> reset + where + NtfSubscription {ntfServer = subNtfServer, smpServer = subSMPServer} = sub + contOrReset = case subAction_ of + -- action was set to NULL after worker internal error + Nothing -> reset + Just (action, _) + -- subscription was marked for deletion / is being deleted + | isDeleteNtfSubAction action -> reset + -- continue work on subscription (e.g. supervisor was repeatedly tasked with creating a subscription) + | otherwise -> case action of + NSASMP _ -> (ns, rs, qServer rq : css, cns) + NSANtf _ -> (ns, rs, css, subNtfServer : cns) + reset = (ns, (rq, sub) : rs, css, cns) NSCSmpDelete -> do - withStore' c (`getPrimaryRcvQueue` connId) >>= \case - Right rq@RcvQueue {server = smpServer} -> do - logInfo $ "processNtfSub, NSCSmpDelete - rq = " <> tshow rq - withStore' c $ \db -> supervisorUpdateNtfAction db connId (NSASMP NSASmpDelete) - lift . void $ getNtfSMPWorker True c smpServer - _ -> notifyInternalError c connId "NSCSmpDelete - no rcv queue" + rqs <- lift $ rights <$> withStoreBatch c (\db -> map (fmap (first storeError) . getPrimaryRcvQueue db) (L.toList connIds)) + logInfo $ "processNtfCmd, NSCSmpDelete - length rqs = " <> tshow (length rqs) + lift $ void $ withStoreBatch' c (\db -> map (\rq -> supervisorUpdateNtfAction db (qConnId rq) (NSASMP NSASmpDelete)) rqs) + kickSMPWorkers rqs NSCNtfWorker ntfServer -> lift . void $ getNtfNTFWorker True c ntfServer NSCNtfSMPWorker smpServer -> lift . void $ getNtfSMPWorker True c smpServer - NSCDeleteSub -> withStore' c $ \db -> deleteNtfSubscription' db connId + NSCDeleteSub -> void $ lift $ withStoreBatch' c $ \db -> map (deleteNtfSubscription' db) (L.toList connIds) + where + kickSMPWorkers :: [RcvQueue] -> AM () + kickSMPWorkers rqs = do + let smpServers = S.fromList $ map qServer rqs + lift $ mapM_ (getNtfSMPWorker True c) smpServers getNtfNTFWorker :: Bool -> AgentClient -> NtfServer -> AM' Worker getNtfNTFWorker hasWork c server = do @@ -173,7 +220,7 @@ runNtfWorker c srv Worker {doWork} = withStore' c $ \db -> updateNtfSubscription db sub {ntfServer, ntfQueueId = Nothing, ntfSubId = Nothing, ntfSubStatus = NASNew} (NSASMP NSASmpKey) ts ns <- asks ntfSupervisor - atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCNtfSMPWorker smpServer) + atomically $ writeTBQueue (ntfSubQ ns) (NSCNtfSMPWorker smpServer, [connId]) status -> updateSubNextCheck ts status atomically $ incNtfServerStat c userId ntfServer ntfChecked Nothing -> workerInternalError c connId "NSACheck - no subscription ID" @@ -184,12 +231,12 @@ runNtfWorker c srv Worker {doWork} = let sub' = sub {ntfSubId = Nothing, ntfSubStatus = NASOff} withStore' c $ \db -> updateNtfSubscription db sub' (NSASMP NSASmpDelete) ts ns <- asks ntfSupervisor - atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCNtfSMPWorker smpServer) + atomically $ writeTBQueue (ntfSubQ ns) (NSCNtfSMPWorker smpServer, [connId]) -- TODO [batch ntf] loop NSARotate -> deleteNtfSub $ do withStore' c $ \db -> deleteNtfSubscription db connId ns <- asks ntfSupervisor - atomically $ writeTBQueue (ntfSubQ ns) (connId, NSCCreate) + atomically $ writeTBQueue (ntfSubQ ns) (NSCCreate, [connId]) -- TODO [batch ntf] loop where -- deleteNtfSub is only used in NSADelete and NSARotate, so also deprecated deleteNtfSub continue = case ntfSubId of @@ -248,7 +295,7 @@ runNtfSMPWorker c srv Worker {doWork} = do setRcvQueueNtfCreds db connId $ Just ClientNtfCreds {ntfPublicKey, ntfPrivateKey, notifierId, rcvNtfDhSecret} updateNtfSubscription db sub {ntfQueueId = Just notifierId, ntfSubStatus = NASKey} (NSANtf NSACreate) ts ns <- asks ntfSupervisor - atomically $ sendNtfSubCommand ns (connId, NSCNtfWorker ntfServer) + atomically $ sendNtfSubCommand ns (NSCNtfWorker ntfServer, [connId]) _ -> workerInternalError c connId "NSASmpKey - no active token" NSASmpDelete -> do -- TODO should we remove it after successful removal from the server? @@ -292,6 +339,10 @@ notifyInternalError :: MonadIO m => AgentClient -> ConnId -> String -> m () notifyInternalError AgentClient {subQ} connId internalErrStr = atomically $ writeTBQueue subQ ("", connId, AEvt SAEConn $ ERR $ INTERNAL internalErrStr) {-# INLINE notifyInternalError #-} +notifyInternalError' :: MonadIO m => AgentClient -> String -> m () +notifyInternalError' AgentClient {subQ} internalErrStr = atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR $ INTERNAL internalErrStr) +{-# INLINE notifyInternalError' #-} + getNtfToken :: AM' (Maybe NtfToken) getNtfToken = do tkn <- asks $ ntfTkn . ntfSupervisor @@ -303,7 +354,7 @@ nsUpdateToken ns tkn = writeTVar (ntfTkn ns) $ Just tkn nsRemoveNtfToken :: NtfSupervisor -> STM () nsRemoveNtfToken ns = writeTVar (ntfTkn ns) Nothing -sendNtfSubCommand :: NtfSupervisor -> (ConnId, NtfSupervisorCommand) -> STM () +sendNtfSubCommand :: NtfSupervisor -> (NtfSupervisorCommand, NonEmpty ConnId) -> STM () sendNtfSubCommand ns cmd = do tkn <- readTVar (ntfTkn ns) when (instantNotifications tkn) $ writeTBQueue (ntfSubQ ns) cmd diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index f354ad530..55f1ab85e 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -151,6 +151,7 @@ module Simplex.Messaging.Agent.Store.SQLite updateNtfToken, removeNtfToken, -- Notification subscription persistence + NtfSupervisorSub, getNtfSubscription, createNtfSubscription, supervisorUpdateNtfSub, @@ -1476,7 +1477,9 @@ removeNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer |] (provider, token, host, port) -getNtfSubscription :: DB.Connection -> ConnId -> IO (Maybe (NtfSubscription, Maybe (NtfSubAction, NtfActionTs))) +type NtfSupervisorSub = (NtfSubscription, Maybe (NtfSubAction, NtfActionTs)) + +getNtfSubscription :: DB.Connection -> ConnId -> IO (Maybe NtfSupervisorSub) getNtfSubscription db connId = maybeFirstRow ntfSubscription $ DB.query