mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-31 11:56:08 +00:00
notifications: SNEW draft (#378)
This commit is contained in:
@@ -120,7 +120,7 @@ instance ToJSON NtfRegCode where
|
||||
|
||||
data NewNtfEntity (e :: NtfEntity) where
|
||||
NewNtfTkn :: DeviceToken -> C.APublicVerifyKey -> C.PublicKeyX25519 -> NewNtfEntity 'Token
|
||||
NewNtfSub :: NtfTokenId -> SMPQueueNtf -> NewNtfEntity 'Subscription -- NtfTokenId -> C.APublicVerifyKey -> SMPQueueNtf
|
||||
NewNtfSub :: NtfTokenId -> SMPQueueNtf -> NewNtfEntity 'Subscription
|
||||
|
||||
deriving instance Show (NewNtfEntity e)
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ import Simplex.Messaging.Notifications.Server.Env
|
||||
import Simplex.Messaging.Notifications.Server.Push.APNS
|
||||
import Simplex.Messaging.Notifications.Server.Store
|
||||
import Simplex.Messaging.Notifications.Transport
|
||||
import Simplex.Messaging.Protocol (ErrorType (..), SignedTransmission, Transmission, encodeTransmission, tGet, tPut)
|
||||
import Simplex.Messaging.Protocol (ErrorType (..), SignedTransmission, Transmission, encodeTransmission, tGet, tPut, SMPServer)
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Server
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
@@ -48,10 +48,12 @@ runNtfServerBlocking started cfg = runReaderT (ntfServer cfg started) =<< newNtf
|
||||
|
||||
ntfServer :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfServerConfig -> TMVar Bool -> m ()
|
||||
ntfServer NtfServerConfig {transports} started = do
|
||||
s <- asks subscriber
|
||||
-- s <- asks subscriber
|
||||
ps <- asks pushServer
|
||||
raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports)
|
||||
raceAny_ (ntfPush ps : map runServer transports)
|
||||
where
|
||||
-- raceAny_ (ntfSubscriber s : ntfPush ps : map runServer transports)
|
||||
|
||||
runServer :: (ServiceName, ATransport) -> m ()
|
||||
runServer (tcpPort, ATransport t) = do
|
||||
serverParams <- asks tlsServerParams
|
||||
@@ -65,18 +67,29 @@ ntfServer NtfServerConfig {transports} started = do
|
||||
Left _ -> pure ()
|
||||
|
||||
ntfSubscriber :: forall m. MonadUnliftIO m => NtfSubscriber -> m ()
|
||||
ntfSubscriber NtfSubscriber {subQ, smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = do
|
||||
ntfSubscriber NtfSubscriber {newSubQ, smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = do
|
||||
raceAny_ [subscribe, receiveSMP, receiveAgent]
|
||||
where
|
||||
subscribe :: m ()
|
||||
subscribe = forever $ do
|
||||
atomically (readTBQueue subQ) >>= \case
|
||||
atomically (readTBQueue newSubQ) >>= \case
|
||||
-- TODO create workers, workers do subscription
|
||||
NtfSub NtfSubData {smpQueue} -> do
|
||||
let SMPQueueNtf {smpServer, notifierId, notifierKey} = smpQueue
|
||||
liftIO (runExceptT $ subscribeQueue ca smpServer ((SPNotifier, notifierId), notifierKey)) >>= \case
|
||||
Right _ -> pure () -- update subscription status
|
||||
Left _e -> pure ()
|
||||
|
||||
getSMPSubscriber :: (MonadUnliftIO m, MonadReader NtfEnv m) => SMPServer -> m SMPSubscriber
|
||||
getSMPSubscriber smpServer = do
|
||||
-- TODO check the map
|
||||
-- if not there, create env + start process (runNtfSubscriber)
|
||||
-- return env
|
||||
atomically $ newSMPSubscriber 0 -- todo get from config
|
||||
|
||||
runSMPSubscriber :: NtfSubscriber -> m ()
|
||||
runSMPSubscriber = forever $ pure ()
|
||||
|
||||
receiveSMP :: m ()
|
||||
receiveSMP = forever $ do
|
||||
(_srv, _sessId, _ntfId, msg) <- atomically $ readTBQueue msgQ
|
||||
@@ -188,12 +201,34 @@ verifyNtfTransmission (sig_, signed, (corrId, entId, _)) cmd = do
|
||||
| verifyCmdSignature sig_ signed tknVerifyKey -> verifiedTknCmd t c
|
||||
| otherwise -> VRFailed
|
||||
_ -> maybe False (dummyVerifyCmd signed) sig_ `seq` VRFailed
|
||||
_ -> pure VRFailed
|
||||
NtfCmd SSubscription c@(SNEW sub@(NewNtfSub tknId _)) -> do
|
||||
-- require that token is active, e.g. return Nothing from store if not
|
||||
(tkn_, sub_) <- atomically $ findNtfSubscription st sub
|
||||
case tkn_ of
|
||||
Just NtfTknData {tknVerifyKey} ->
|
||||
pure $
|
||||
if verifyCmdSignature sig_ signed tknVerifyKey
|
||||
then case sub_ of
|
||||
Just s@NtfSubData {tokenId}
|
||||
| tknId == tokenId -> verifiedSubCmd s c
|
||||
| otherwise -> VRFailed
|
||||
_ -> VRVerified (NtfReqNew corrId (ANE SSubscription sub))
|
||||
else VRFailed
|
||||
_ -> pure $ maybe False (dummyVerifyCmd signed) sig_ `seq` VRFailed
|
||||
NtfCmd SSubscription c -> do
|
||||
-- require that token is active, e.g. return Nothing from store if not
|
||||
r_ <- atomically $ getNtfSubscription st entId
|
||||
pure $ case r_ of
|
||||
Just (s, NtfTknData {tknVerifyKey})
|
||||
| verifyCmdSignature sig_ signed tknVerifyKey -> verifiedSubCmd s c
|
||||
| otherwise -> VRFailed
|
||||
_ -> maybe False (dummyVerifyCmd signed) sig_ `seq` VRFailed
|
||||
where
|
||||
verifiedTknCmd t c = VRVerified (NtfReqCmd SToken (NtfTkn t) (corrId, entId, c))
|
||||
verifiedSubCmd s c = VRVerified (NtfReqCmd SSubscription (NtfSub s) (corrId, entId, c))
|
||||
|
||||
client :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfServerClient -> NtfSubscriber -> NtfPushServer -> m ()
|
||||
client NtfServerClient {rcvQ, sndQ} NtfSubscriber {subQ = _} NtfPushServer {pushQ, intervalNotifiers} =
|
||||
client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ = _} NtfPushServer {pushQ, intervalNotifiers} =
|
||||
forever $
|
||||
atomically (readTBQueue rcvQ)
|
||||
>>= processCommand
|
||||
@@ -267,10 +302,24 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {subQ = _} NtfPushServer {push
|
||||
intervalNotifier delay = forever $ do
|
||||
threadDelay delay
|
||||
atomically $ writeTBQueue pushQ (tkn, PNCheckMessages)
|
||||
NtfReqNew corrId (ANE SSubscription _newSub) -> pure (corrId, "", NROk)
|
||||
NtfReqNew corrId (ANE SSubscription newSub) -> do
|
||||
st <- asks store
|
||||
subId <- getId
|
||||
atomically $ do
|
||||
sub <- mkNtfSubData newSub
|
||||
addNtfSubscription st subId sub
|
||||
-- TODO launch NSUB
|
||||
-- writeTBQueue subQ ("", connId, cmd)
|
||||
-- TODO response ID
|
||||
pure (corrId, "", NROk)
|
||||
NtfReqCmd SSubscription _sub (corrId, subId, cmd) ->
|
||||
(corrId,subId,) <$> case cmd of
|
||||
SNEW _newSub -> pure NROk
|
||||
SNEW _newSub -> do
|
||||
logDebug "SNEW - registered subscription"
|
||||
-- st <- asks store
|
||||
-- TODO check NKEY?
|
||||
-- TODO response ID
|
||||
pure NROk
|
||||
SCHK -> pure NROk
|
||||
SDEL -> pure NROk
|
||||
PING -> pure NRPong
|
||||
|
||||
@@ -23,7 +23,7 @@ import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Notifications.Server.Push.APNS
|
||||
import Simplex.Messaging.Notifications.Server.Store
|
||||
import Simplex.Messaging.Protocol (CorrId, Transmission)
|
||||
import Simplex.Messaging.Protocol (CorrId, Transmission, SMPServer)
|
||||
import Simplex.Messaging.Server.Expiration
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
@@ -78,15 +78,26 @@ newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsCo
|
||||
pure NtfEnv {config, subscriber, pushServer, store, idsDrg, tlsServerParams, serverIdentity = C.KeyHash fp}
|
||||
|
||||
data NtfSubscriber = NtfSubscriber
|
||||
{ subQ :: TBQueue (NtfEntityRec 'Subscription),
|
||||
{ smpSubscribers :: TMap SMPServer SMPSubscriber,
|
||||
newSubQ :: TBQueue (NtfEntityRec 'Subscription),
|
||||
smpAgent :: SMPClientAgent
|
||||
}
|
||||
|
||||
newNtfSubscriber :: Natural -> SMPClientAgentConfig -> STM NtfSubscriber
|
||||
newNtfSubscriber qSize smpAgentCfg = do
|
||||
smpSubscribers <- TM.empty
|
||||
newSubQ <- newTBQueue qSize
|
||||
smpAgent <- newSMPClientAgent smpAgentCfg
|
||||
subQ <- newTBQueue qSize
|
||||
pure NtfSubscriber {smpAgent, subQ}
|
||||
pure NtfSubscriber {smpSubscribers, newSubQ, smpAgent}
|
||||
|
||||
newtype SMPSubscriber = SMPSubscriber
|
||||
{ newSubQ :: TBQueue (NtfEntityRec 'Subscription)
|
||||
}
|
||||
|
||||
newSMPSubscriber :: Natural -> STM SMPSubscriber
|
||||
newSMPSubscriber qSize = do
|
||||
newSubQ <- newTBQueue qSize
|
||||
pure SMPSubscriber {newSubQ}
|
||||
|
||||
data NtfPushServer = NtfPushServer
|
||||
{ pushQ :: TBQueue (NtfTknData, PushNotification),
|
||||
|
||||
@@ -14,22 +14,30 @@ import Control.Concurrent.STM
|
||||
import Control.Monad
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Set (Set)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Protocol (NotifierId, ProtocolServer)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (whenM, ($>>=))
|
||||
|
||||
data NtfStore = NtfStore
|
||||
{ tokens :: TMap NtfTokenId NtfTknData,
|
||||
tokenRegistrations :: TMap DeviceToken (TMap ByteString NtfTokenId)
|
||||
tokenRegistrations :: TMap DeviceToken (TMap ByteString NtfTokenId),
|
||||
subscriptions :: TMap NtfSubscriptionId NtfSubData,
|
||||
tokenSubscriptions :: TMap NtfTokenId (TVar (Set NtfSubscriptionId)),
|
||||
subscriptionLookup :: TMap (NtfTokenId, ProtocolServer, NotifierId) NtfSubscriptionId
|
||||
}
|
||||
|
||||
newNtfStore :: STM NtfStore
|
||||
newNtfStore = do
|
||||
tokens <- TM.empty
|
||||
tokenRegistrations <- TM.empty
|
||||
pure NtfStore {tokens, tokenRegistrations}
|
||||
subscriptions <- TM.empty
|
||||
tokenSubscriptions <- TM.empty
|
||||
subscriptionLookup <- TM.empty
|
||||
pure NtfStore {tokens, tokenRegistrations, subscriptions, tokenSubscriptions, subscriptionLookup}
|
||||
|
||||
data NtfTknData = NtfTknData
|
||||
{ ntfTknId :: NtfTokenId,
|
||||
@@ -118,6 +126,33 @@ deleteNtfToken st tknId = do
|
||||
regs = tokenRegistrations st
|
||||
regKey = C.toPubKey C.pubKeyBytes
|
||||
|
||||
getNtfSubscription :: NtfStore -> NtfSubscriptionId -> STM (Maybe (NtfSubData, NtfTknData))
|
||||
getNtfSubscription st subId = pure Nothing -- TM.lookup subId (subscriptions st)
|
||||
|
||||
findNtfSubscription :: NtfStore -> NewNtfEntity 'Subscription -> STM (Maybe NtfTknData, Maybe NtfSubData)
|
||||
findNtfSubscription st (NewNtfSub tknId smpQueue) = pure (Nothing, Nothing)
|
||||
|
||||
-- findNtfSubscription :: NtfStore -> NewNtfEntity 'Subscription -> STM (Maybe NtfSubData)
|
||||
-- findNtfSubscription st (NewNtfSub tknId smpQueue) =
|
||||
-- TM.lookup (tknId, smpQueue) (subscriptionLookup st)
|
||||
-- $>>= (`TM.lookup` subscriptions st)
|
||||
|
||||
mkNtfSubData :: NewNtfEntity 'Subscription -> STM NtfSubData
|
||||
mkNtfSubData (NewNtfSub tokenId smpQueue) = do
|
||||
subStatus <- newTVar NSNew
|
||||
pure NtfSubData {smpQueue, tokenId, subStatus}
|
||||
|
||||
addNtfSubscription :: NtfStore -> NtfSubscriptionId -> NtfSubData -> STM ()
|
||||
addNtfSubscription st subId sub@NtfSubData {smpQueue = SMPQueueNtf {smpServer, notifierId}, tokenId} = do
|
||||
TM.insert subId sub $ subscriptions st
|
||||
TM.lookup (tokenId, smpServer, notifierId) sRegs
|
||||
>>= ( \case
|
||||
Nothing -> pure ()
|
||||
Just _ -> pure ()
|
||||
)
|
||||
where
|
||||
sRegs = subscriptionLookup st
|
||||
|
||||
-- getNtfRec :: NtfStore -> SNtfEntity e -> NtfEntityId -> STM (Maybe (NtfEntityRec e))
|
||||
-- getNtfRec st ent entId = case ent of
|
||||
-- SToken -> NtfTkn <$$> TM.lookup entId (tokens st)
|
||||
|
||||
Reference in New Issue
Block a user