From b60408837a35569eddb87661ebdd4a73a275e4e4 Mon Sep 17 00:00:00 2001 From: JRoberts <8711996+jr-simplex@users.noreply.github.com> Date: Wed, 1 Jun 2022 18:42:35 +0400 Subject: [PATCH] notifications: SNEW draft (#378) --- .../Messaging/Notifications/Protocol.hs | 2 +- src/Simplex/Messaging/Notifications/Server.hs | 67 ++++++++++++++++--- .../Messaging/Notifications/Server/Env.hs | 19 ++++-- .../Messaging/Notifications/Server/Store.hs | 39 ++++++++++- 4 files changed, 111 insertions(+), 16 deletions(-) diff --git a/src/Simplex/Messaging/Notifications/Protocol.hs b/src/Simplex/Messaging/Notifications/Protocol.hs index 04fb3e21f..3640004e0 100644 --- a/src/Simplex/Messaging/Notifications/Protocol.hs +++ b/src/Simplex/Messaging/Notifications/Protocol.hs @@ -120,7 +120,7 @@ instance ToJSON NtfRegCode where data NewNtfEntity (e :: NtfEntity) where NewNtfTkn :: DeviceToken -> C.APublicVerifyKey -> C.PublicKeyX25519 -> NewNtfEntity 'Token - NewNtfSub :: NtfTokenId -> SMPQueueNtf -> NewNtfEntity 'Subscription -- NtfTokenId -> C.APublicVerifyKey -> SMPQueueNtf + NewNtfSub :: NtfTokenId -> SMPQueueNtf -> NewNtfEntity 'Subscription deriving instance Show (NewNtfEntity e) diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 2bb412b39..a8e225fad 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -26,7 +26,7 @@ import Simplex.Messaging.Notifications.Server.Env import Simplex.Messaging.Notifications.Server.Push.APNS import Simplex.Messaging.Notifications.Server.Store import Simplex.Messaging.Notifications.Transport -import Simplex.Messaging.Protocol (ErrorType (..), SignedTransmission, Transmission, encodeTransmission, tGet, tPut) +import Simplex.Messaging.Protocol (ErrorType (..), SignedTransmission, Transmission, encodeTransmission, tGet, tPut, SMPServer) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server import qualified Simplex.Messaging.TMap as TM @@ -48,10 +48,12 @@ runNtfServerBlocking started cfg = runReaderT (ntfServer cfg started) =<< newNtf ntfServer :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfServerConfig -> TMVar Bool -> m () ntfServer NtfServerConfig {transports} started = do - s <- asks subscriber + -- s <- asks subscriber ps <- asks pushServer - raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports) + raceAny_ (ntfPush ps : map runServer transports) where + -- raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports) + runServer :: (ServiceName, ATransport) -> m () runServer (tcpPort, ATransport t) = do serverParams <- asks tlsServerParams @@ -65,18 +67,29 @@ ntfServer NtfServerConfig {transports} started = do Left _ -> pure () ntfSubscriber :: forall m. MonadUnliftIO m => NtfSubscriber -> m () -ntfSubscriber NtfSubscriber {subQ, smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = do +ntfSubscriber NtfSubscriber {newSubQ, smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = do raceAny_ [subscribe, receiveSMP, receiveAgent] where subscribe :: m () subscribe = forever $ do - atomically (readTBQueue subQ) >>= \case + atomically (readTBQueue newSubQ) >>= \case + -- TODO create workers, workers do subscription NtfSub NtfSubData {smpQueue} -> do let SMPQueueNtf {smpServer, notifierId, notifierKey} = smpQueue liftIO (runExceptT $ subscribeQueue ca smpServer ((SPNotifier, notifierId), notifierKey)) >>= \case Right _ -> pure () -- update subscription status Left _e -> pure () + getSMPSubscriber :: (MonadUnliftIO m, MonadReader NtfEnv m) => SMPServer -> m SMPSubscriber + getSMPSubscriber smpServer = do + -- TODO check the map + -- if not there, create env + start process (runNtfSubscriber) + -- return env + atomically $ newSMPSubscriber 0 -- todo get from config + + runSMPSubscriber :: NtfSubscriber -> m () + runSMPSubscriber = forever $ pure () + receiveSMP :: m () receiveSMP = forever $ do (_srv, _sessId, _ntfId, msg) <- atomically $ readTBQueue msgQ @@ -188,12 +201,34 @@ verifyNtfTransmission (sig_, signed, (corrId, entId, _)) cmd = do | verifyCmdSignature sig_ signed tknVerifyKey -> verifiedTknCmd t c | otherwise -> VRFailed _ -> maybe False (dummyVerifyCmd signed) sig_ `seq` VRFailed - _ -> pure VRFailed + NtfCmd SSubscription c@(SNEW sub@(NewNtfSub tknId _)) -> do + -- require that token is active, e.g. return Nothing from store if not + (tkn_, sub_) <- atomically $ findNtfSubscription st sub + case tkn_ of + Just NtfTknData {tknVerifyKey} -> + pure $ + if verifyCmdSignature sig_ signed tknVerifyKey + then case sub_ of + Just s@NtfSubData {tokenId} + | tknId == tokenId -> verifiedSubCmd s c + | otherwise -> VRFailed + _ -> VRVerified (NtfReqNew corrId (ANE SSubscription sub)) + else VRFailed + _ -> pure $ maybe False (dummyVerifyCmd signed) sig_ `seq` VRFailed + NtfCmd SSubscription c -> do + -- require that token is active, e.g. return Nothing from store if not + r_ <- atomically $ getNtfSubscription st entId + pure $ case r_ of + Just (s, NtfTknData {tknVerifyKey}) + | verifyCmdSignature sig_ signed tknVerifyKey -> verifiedSubCmd s c + | otherwise -> VRFailed + _ -> maybe False (dummyVerifyCmd signed) sig_ `seq` VRFailed where verifiedTknCmd t c = VRVerified (NtfReqCmd SToken (NtfTkn t) (corrId, entId, c)) + verifiedSubCmd s c = VRVerified (NtfReqCmd SSubscription (NtfSub s) (corrId, entId, c)) client :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfServerClient -> NtfSubscriber -> NtfPushServer -> m () -client NtfServerClient {rcvQ, sndQ} NtfSubscriber {subQ = _} NtfPushServer {pushQ, intervalNotifiers} = +client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ = _} NtfPushServer {pushQ, intervalNotifiers} = forever $ atomically (readTBQueue rcvQ) >>= processCommand @@ -267,10 +302,24 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {subQ = _} NtfPushServer {push intervalNotifier delay = forever $ do threadDelay delay atomically $ writeTBQueue pushQ (tkn, PNCheckMessages) - NtfReqNew corrId (ANE SSubscription _newSub) -> pure (corrId, "", NROk) + NtfReqNew corrId (ANE SSubscription newSub) -> do + st <- asks store + subId <- getId + atomically $ do + sub <- mkNtfSubData newSub + addNtfSubscription st subId sub + -- TODO launch NSUB + -- writeTBQueue subQ ("", connId, cmd) + -- TODO response ID + pure (corrId, "", NROk) NtfReqCmd SSubscription _sub (corrId, subId, cmd) -> (corrId,subId,) <$> case cmd of - SNEW _newSub -> pure NROk + SNEW _newSub -> do + logDebug "SNEW - registered subscription" + -- st <- asks store + -- TODO check NKEY? + -- TODO response ID + pure NROk SCHK -> pure NROk SDEL -> pure NROk PING -> pure NRPong diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index aaf04beb4..26a2a7de9 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -23,7 +23,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Push.APNS import Simplex.Messaging.Notifications.Server.Store -import Simplex.Messaging.Protocol (CorrId, Transmission) +import Simplex.Messaging.Protocol (CorrId, Transmission, SMPServer) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -78,15 +78,26 @@ newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsCo pure NtfEnv {config, subscriber, pushServer, store, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp} data NtfSubscriber = NtfSubscriber - { subQ :: TBQueue (NtfEntityRec 'Subscription), + { smpSubscribers :: TMap SMPServer SMPSubscriber, + newSubQ :: TBQueue (NtfEntityRec 'Subscription), smpAgent :: SMPClientAgent } newNtfSubscriber :: Natural -> SMPClientAgentConfig -> STM NtfSubscriber newNtfSubscriber qSize smpAgentCfg = do + smpSubscribers <- TM.empty + newSubQ <- newTBQueue qSize smpAgent <- newSMPClientAgent smpAgentCfg - subQ <- newTBQueue qSize - pure NtfSubscriber {smpAgent, subQ} + pure NtfSubscriber {smpSubscribers, newSubQ, smpAgent} + +newtype SMPSubscriber = SMPSubscriber + { newSubQ :: TBQueue (NtfEntityRec 'Subscription) + } + +newSMPSubscriber :: Natural -> STM SMPSubscriber +newSMPSubscriber qSize = do + newSubQ <- newTBQueue qSize + pure SMPSubscriber {newSubQ} data NtfPushServer = NtfPushServer { pushQ :: TBQueue (NtfTknData, PushNotification), diff --git a/src/Simplex/Messaging/Notifications/Server/Store.hs b/src/Simplex/Messaging/Notifications/Server/Store.hs index e39e7d3e1..d0b2e9d7d 100644 --- a/src/Simplex/Messaging/Notifications/Server/Store.hs +++ b/src/Simplex/Messaging/Notifications/Server/Store.hs @@ -14,22 +14,30 @@ import Control.Concurrent.STM import Control.Monad import Data.ByteString.Char8 (ByteString) import qualified Data.Map.Strict as M +import Data.Set (Set) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol +import Simplex.Messaging.Protocol (NotifierId, ProtocolServer) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (whenM, ($>>=)) data NtfStore = NtfStore { tokens :: TMap NtfTokenId NtfTknData, - tokenRegistrations :: TMap DeviceToken (TMap ByteString NtfTokenId) + tokenRegistrations :: TMap DeviceToken (TMap ByteString NtfTokenId), + subscriptions :: TMap NtfSubscriptionId NtfSubData, + tokenSubscriptions :: TMap NtfTokenId (TVar (Set NtfSubscriptionId)), + subscriptionLookup :: TMap (NtfTokenId, ProtocolServer, NotifierId) NtfSubscriptionId } newNtfStore :: STM NtfStore newNtfStore = do tokens <- TM.empty tokenRegistrations <- TM.empty - pure NtfStore {tokens, tokenRegistrations} + subscriptions <- TM.empty + tokenSubscriptions <- TM.empty + subscriptionLookup <- TM.empty + pure NtfStore {tokens, tokenRegistrations, subscriptions, tokenSubscriptions, subscriptionLookup} data NtfTknData = NtfTknData { ntfTknId :: NtfTokenId, @@ -118,6 +126,33 @@ deleteNtfToken st tknId = do regs = tokenRegistrations st regKey = C.toPubKey C.pubKeyBytes +getNtfSubscription :: NtfStore -> NtfSubscriptionId -> STM (Maybe (NtfSubData, NtfTknData)) +getNtfSubscription st subId = pure Nothing -- TM.lookup subId (subscriptions st) + +findNtfSubscription :: NtfStore -> NewNtfEntity 'Subscription -> STM (Maybe NtfTknData, Maybe NtfSubData) +findNtfSubscription st (NewNtfSub tknId smpQueue) = pure (Nothing, Nothing) + +-- findNtfSubscription :: NtfStore -> NewNtfEntity 'Subscription -> STM (Maybe NtfSubData) +-- findNtfSubscription st (NewNtfSub tknId smpQueue) = +-- TM.lookup (tknId, smpQueue) (subscriptionLookup st) +-- $>>= (`TM.lookup` subscriptions st) + +mkNtfSubData :: NewNtfEntity 'Subscription -> STM NtfSubData +mkNtfSubData (NewNtfSub tokenId smpQueue) = do + subStatus <- newTVar NSNew + pure NtfSubData {smpQueue, tokenId, subStatus} + +addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM () +addNtfSubscription st subId sub@NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} = do + TM.insert subId sub $ subscriptions st + TM.lookup (tokenId, smpServer, notifierId) sRegs + >>= ( \case + Nothing -> pure () + Just _ -> pure () + ) + where + sRegs = subscriptionLookup st + -- getNtfRec :: NtfStore -> SNtfEntity e -> NtfEntityId -> STM (Maybe (NtfEntityRec e)) -- getNtfRec st ent entId = case ent of -- SToken -> NtfTkn <$$> TM.lookup entId (tokens st)