mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-08 15:12:23 +00:00
ntf server: record token invalidation reason, add date of the last token activity (#1449)
* ntf server: record token invalidation reason, add date of the last token activity * update time * rename * optional * include token ID in delivery error * version * protocol version * fix, log error
This commit is contained in:
+1
-1
@@ -1,7 +1,7 @@
|
||||
cabal-version: 1.12
|
||||
|
||||
name: simplexmq
|
||||
version: 6.3.0.3
|
||||
version: 6.3.0.301
|
||||
synopsis: SimpleXMQ message broker
|
||||
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
|
||||
<./docs/Simplex-Messaging-Client.html client> and
|
||||
|
||||
@@ -11,7 +11,7 @@
|
||||
|
||||
module Simplex.Messaging.Notifications.Protocol where
|
||||
|
||||
import Control.Applicative ((<|>))
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import Data.Aeson (FromJSON (..), ToJSON (..), (.:), (.=))
|
||||
import qualified Data.Aeson as J
|
||||
import qualified Data.Aeson.Encoding as JE
|
||||
@@ -32,7 +32,7 @@ import Simplex.Messaging.Agent.Store.DB (FromField (..), ToField (..))
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Notifications.Transport (NTFVersion, ntfClientHandshake)
|
||||
import Simplex.Messaging.Notifications.Transport (NTFVersion, invalidReasonNTFVersion, ntfClientHandshake)
|
||||
import Simplex.Messaging.Parsers (fromTextField_)
|
||||
import Simplex.Messaging.Protocol hiding (Command (..), CommandTag (..))
|
||||
import Simplex.Messaging.Util (eitherToMaybe, (<$?>))
|
||||
@@ -296,12 +296,18 @@ data NtfResponse
|
||||
|
||||
instance ProtocolEncoding NTFVersion ErrorType NtfResponse where
|
||||
type Tag NtfResponse = NtfResponseTag
|
||||
encodeProtocol _v = \case
|
||||
encodeProtocol v = \case
|
||||
NRTknId entId dhKey -> e (NRTknId_, ' ', entId, dhKey)
|
||||
NRSubId entId -> e (NRSubId_, ' ', entId)
|
||||
NROk -> e NROk_
|
||||
NRErr err -> e (NRErr_, ' ', err)
|
||||
NRTkn stat -> e (NRTkn_, ' ', stat)
|
||||
NRTkn stat -> e (NRTkn_, ' ', stat')
|
||||
where
|
||||
stat'
|
||||
| v >= invalidReasonNTFVersion = stat
|
||||
| otherwise = case stat of
|
||||
NTInvalid _ -> NTInvalid Nothing
|
||||
_ -> stat
|
||||
NRSub stat -> e (NRSub_, ' ', stat)
|
||||
NRPong -> e NRPong_
|
||||
where
|
||||
@@ -520,7 +526,7 @@ data NtfTknStatus
|
||||
| -- | state after registration (TNEW)
|
||||
NTRegistered
|
||||
| -- | if initial notification failed (push provider error) or verification failed
|
||||
NTInvalid
|
||||
NTInvalid (Maybe NTInvalidReason)
|
||||
| -- | Token confirmed via notification (accepted by push provider or verification code received by client)
|
||||
NTConfirmed
|
||||
| -- | after successful verification (TVFY)
|
||||
@@ -533,7 +539,7 @@ instance Encoding NtfTknStatus where
|
||||
smpEncode = \case
|
||||
NTNew -> "NEW"
|
||||
NTRegistered -> "REGISTERED"
|
||||
NTInvalid -> "INVALID"
|
||||
NTInvalid r_ -> "INVALID" <> maybe "" (\r -> ',' `B.cons` strEncode r) r_
|
||||
NTConfirmed -> "CONFIRMED"
|
||||
NTActive -> "ACTIVE"
|
||||
NTExpired -> "EXPIRED"
|
||||
@@ -541,12 +547,31 @@ instance Encoding NtfTknStatus where
|
||||
A.takeTill (== ' ') >>= \case
|
||||
"NEW" -> pure NTNew
|
||||
"REGISTERED" -> pure NTRegistered
|
||||
"INVALID" -> pure NTInvalid
|
||||
"INVALID" -> NTInvalid <$> optional (A.char ',' *> strP)
|
||||
"CONFIRMED" -> pure NTConfirmed
|
||||
"ACTIVE" -> pure NTActive
|
||||
"EXPIRED" -> pure NTExpired
|
||||
_ -> fail "bad NtfTknStatus"
|
||||
|
||||
instance StrEncoding NTInvalidReason where
|
||||
strEncode = smpEncode
|
||||
strP = smpP
|
||||
|
||||
data NTInvalidReason = NTIRBadToken | NTIRTokenNotForTopic | NTIRGone410
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance Encoding NTInvalidReason where
|
||||
smpEncode = \case
|
||||
NTIRBadToken -> "BAD"
|
||||
NTIRTokenNotForTopic -> "TOPIC"
|
||||
NTIRGone410 -> "GONE"
|
||||
smpP =
|
||||
A.takeTill (== ' ') >>= \case
|
||||
"BAD" -> pure NTIRBadToken
|
||||
"TOPIC" -> pure NTIRTokenNotForTopic
|
||||
"GONE" -> pure NTIRGone410
|
||||
_ -> fail "bad NTInvalidReason"
|
||||
|
||||
instance StrEncoding NtfTknStatus where
|
||||
strEncode = smpEncode
|
||||
strP = smpP
|
||||
|
||||
@@ -58,6 +58,7 @@ import Simplex.Messaging.Protocol (EntityId (..), ErrorType (..), ProtocolServer
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Server
|
||||
import Simplex.Messaging.Server.Control (CPClientRole (..))
|
||||
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime, getSystemDate)
|
||||
import Simplex.Messaging.Server.Stats (PeriodStats (..), PeriodStatCounts (..), periodStatCounts, updatePeriodStats)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
@@ -435,17 +436,19 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do
|
||||
liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp)
|
||||
status <- readTVarIO tknStatus
|
||||
case ntf of
|
||||
PNVerification _
|
||||
| status /= NTInvalid && status /= NTExpired ->
|
||||
deliverNotification pp tkn ntf >>= \case
|
||||
Right _ -> do
|
||||
status_ <- atomically $ stateTVar tknStatus $ \case
|
||||
NTActive -> (Nothing, NTActive)
|
||||
NTConfirmed -> (Nothing, NTConfirmed)
|
||||
_ -> (Just NTConfirmed, NTConfirmed)
|
||||
forM_ status_ $ \status' -> withNtfLog $ \sl -> logTokenStatus sl ntfTknId status'
|
||||
_ -> pure ()
|
||||
| otherwise -> logError "bad notification token status"
|
||||
PNVerification _ -> case status of
|
||||
NTInvalid _ -> logError $ "bad notification token status: " <> tshow status
|
||||
-- TODO nothing makes token "expired" on the server
|
||||
NTExpired -> logError $ "bad notification token status: " <> tshow status
|
||||
_ ->
|
||||
deliverNotification pp tkn ntf >>= \case
|
||||
Right _ -> do
|
||||
status_ <- atomically $ stateTVar tknStatus $ \case
|
||||
NTActive -> (Nothing, NTActive)
|
||||
NTConfirmed -> (Nothing, NTConfirmed)
|
||||
_ -> (Just NTConfirmed, NTConfirmed)
|
||||
forM_ status_ $ \status' -> withNtfLog $ \sl -> logTokenStatus sl ntfTknId status'
|
||||
_ -> pure ()
|
||||
PNCheckMessages -> checkActiveTkn status $ do
|
||||
void $ deliverNotification pp tkn ntf
|
||||
PNMessage {} -> checkActiveTkn status $ do
|
||||
@@ -459,7 +462,7 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do
|
||||
| status == NTActive = action
|
||||
| otherwise = liftIO $ logError "bad notification token status"
|
||||
deliverNotification :: PushProvider -> NtfTknData -> PushNotification -> M (Either PushProviderError ())
|
||||
deliverNotification pp tkn ntf = do
|
||||
deliverNotification pp tkn@NtfTknData {ntfTknId} ntf = do
|
||||
deliver <- liftIO $ getPushClient s pp
|
||||
liftIO (runExceptT $ deliver tkn ntf) >>= \case
|
||||
Right _ -> pure $ Right ()
|
||||
@@ -468,14 +471,14 @@ ntfPush s@NtfPushServer {pushQ} = forever $ do
|
||||
PPRetryLater -> retryDeliver
|
||||
PPCryptoError _ -> err e
|
||||
PPResponseError _ _ -> err e
|
||||
PPTokenInvalid -> updateTknStatus tkn NTInvalid >> err e
|
||||
PPTokenInvalid r -> updateTknStatus tkn (NTInvalid $ Just r) >> err e
|
||||
PPPermanentError -> err e
|
||||
where
|
||||
retryDeliver :: M (Either PushProviderError ())
|
||||
retryDeliver = do
|
||||
deliver <- liftIO $ newPushClient s pp
|
||||
liftIO (runExceptT $ deliver tkn ntf) >>= either err (pure . Right)
|
||||
err e = logError (T.pack $ "Push provider error (" <> show pp <> "): " <> show e) $> Left e
|
||||
err e = logError ("Push provider error (" <> tshow pp <> ", " <> tshow ntfTknId <> "): " <> tshow e) $> Left e
|
||||
|
||||
updateTknStatus :: NtfTknData -> NtfTknStatus -> M ()
|
||||
updateTknStatus NtfTknData {ntfTknId, tknStatus} status = do
|
||||
@@ -509,13 +512,17 @@ receive th@THandle {params = THandleParams {thAuth}} NtfServerClient {rcvQ, sndQ
|
||||
where
|
||||
cmdAction t@(_, _, (corrId, entId, cmdOrError)) =
|
||||
case cmdOrError of
|
||||
Left e -> pure $ Left (corrId, entId, NRErr e)
|
||||
Left e -> do
|
||||
logError $ "invalid client request: " <> tshow e
|
||||
pure $ Left (corrId, entId, NRErr e)
|
||||
Right cmd ->
|
||||
verified <$> verifyNtfTransmission ((,C.cbNonce (SMP.bs corrId)) <$> thAuth) t cmd
|
||||
verified =<< verifyNtfTransmission ((,C.cbNonce (SMP.bs corrId)) <$> thAuth) t cmd
|
||||
where
|
||||
verified = \case
|
||||
VRVerified req -> Right req
|
||||
VRFailed -> Left (corrId, entId, NRErr AUTH)
|
||||
VRVerified req -> pure $ Right req
|
||||
VRFailed -> do
|
||||
logError "unauthorized client request"
|
||||
pure $ Left (corrId, entId, NRErr AUTH)
|
||||
write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty
|
||||
|
||||
send :: Transport c => THandleNTF c 'TServer -> NtfServerClient -> IO ()
|
||||
@@ -524,7 +531,7 @@ send h@THandle {params} NtfServerClient {sndQ, sndActiveAt} = forever $ do
|
||||
void . liftIO $ tPut h $ L.map (\t -> Right (Nothing, encodeTransmission params t)) ts
|
||||
atomically . (writeTVar sndActiveAt $!) =<< liftIO getSystemTime
|
||||
|
||||
data VerificationResult = VRVerified NtfRequest | VRFailed
|
||||
data VerificationResult = VRVerified (Maybe NtfTknData, NtfRequest) | VRFailed
|
||||
|
||||
verifyNtfTransmission :: Maybe (THandleAuth 'TServer, C.CbNonce) -> SignedTransmission ErrorType NtfCmd -> NtfCmd -> M VerificationResult
|
||||
verifyNtfTransmission auth_ (tAuth, authorized, (corrId, entId, _)) cmd = do
|
||||
@@ -538,34 +545,34 @@ verifyNtfTransmission auth_ (tAuth, authorized, (corrId, entId, _)) cmd = do
|
||||
Just t@NtfTknData {tknVerifyKey}
|
||||
| k == tknVerifyKey -> verifiedTknCmd t c
|
||||
| otherwise -> VRFailed
|
||||
_ -> VRVerified (NtfReqNew corrId (ANE SToken tkn))
|
||||
Nothing -> VRVerified (Nothing, NtfReqNew corrId (ANE SToken tkn))
|
||||
else VRFailed
|
||||
NtfCmd SToken c -> do
|
||||
t_ <- atomically $ getNtfToken st entId
|
||||
t_ <- liftIO $ getNtfTokenIO st entId
|
||||
verifyToken t_ (`verifiedTknCmd` c)
|
||||
NtfCmd SSubscription c@(SNEW sub@(NewNtfSub tknId smpQueue _)) -> do
|
||||
s_ <- atomically $ findNtfSubscription st smpQueue
|
||||
case s_ of
|
||||
Nothing -> do
|
||||
t_ <- atomically $ getActiveNtfToken st tknId
|
||||
verifyToken' t_ $ VRVerified (NtfReqNew corrId (ANE SSubscription sub))
|
||||
verifyToken' t_ $ VRVerified (t_, NtfReqNew corrId (ANE SSubscription sub))
|
||||
Just s@NtfSubData {tokenId = subTknId} ->
|
||||
if subTknId == tknId
|
||||
then do
|
||||
t_ <- atomically $ getActiveNtfToken st subTknId
|
||||
verifyToken' t_ $ verifiedSubCmd s c
|
||||
verifyToken' t_ $ verifiedSubCmd t_ s c
|
||||
else pure $ maybe False (dummyVerifyCmd auth_ authorized) tAuth `seq` VRFailed
|
||||
NtfCmd SSubscription PING -> pure $ VRVerified $ NtfReqPing corrId entId
|
||||
NtfCmd SSubscription PING -> pure $ VRVerified (Nothing, NtfReqPing corrId entId)
|
||||
NtfCmd SSubscription c -> do
|
||||
s_ <- atomically $ getNtfSubscription st entId
|
||||
s_ <- liftIO $ getNtfSubscriptionIO st entId
|
||||
case s_ of
|
||||
Just s@NtfSubData {tokenId = subTknId} -> do
|
||||
t_ <- atomically $ getActiveNtfToken st subTknId
|
||||
verifyToken' t_ $ verifiedSubCmd s c
|
||||
verifyToken' t_ $ verifiedSubCmd t_ s c
|
||||
_ -> pure $ maybe False (dummyVerifyCmd auth_ authorized) tAuth `seq` VRFailed
|
||||
where
|
||||
verifiedTknCmd t c = VRVerified (NtfReqCmd SToken (NtfTkn t) (corrId, entId, c))
|
||||
verifiedSubCmd s c = VRVerified (NtfReqCmd SSubscription (NtfSub s) (corrId, entId, c))
|
||||
verifiedTknCmd t c = VRVerified (Just t, NtfReqCmd SToken (NtfTkn t) (corrId, entId, c))
|
||||
verifiedSubCmd t_ s c = VRVerified (t_, NtfReqCmd SSubscription (NtfSub s) (corrId, entId, c))
|
||||
verifyToken :: Maybe NtfTknData -> (NtfTknData -> VerificationResult) -> M VerificationResult
|
||||
verifyToken t_ positiveVerificationResult =
|
||||
pure $ case t_ of
|
||||
@@ -579,11 +586,18 @@ verifyNtfTransmission auth_ (tAuth, authorized, (corrId, entId, _)) cmd = do
|
||||
|
||||
client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M ()
|
||||
client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPushServer {pushQ, intervalNotifiers} =
|
||||
forever $
|
||||
forever $ do
|
||||
ts <- liftIO getSystemDate
|
||||
atomically (readTBQueue rcvQ)
|
||||
>>= mapM processCommand
|
||||
>>= mapM (\(tkn_, req) -> updateTokenDate ts tkn_ >> processCommand req)
|
||||
>>= atomically . writeTBQueue sndQ
|
||||
where
|
||||
updateTokenDate :: RoundedSystemTime -> Maybe NtfTknData -> M ()
|
||||
updateTokenDate ts' = mapM_ $ \NtfTknData {ntfTknId, tknUpdatedAt} -> do
|
||||
let t' = Just ts'
|
||||
t <- atomically $ swapTVar tknUpdatedAt t'
|
||||
unless (t' == t) $ withNtfLog $ \s -> logUpdateTokenTime s ntfTknId ts'
|
||||
|
||||
processCommand :: NtfRequest -> M (Transmission NtfResponse)
|
||||
processCommand = \case
|
||||
NtfReqNew corrId (ANE SToken newTkn@(NewNtfTkn token _ dhPubKey)) -> do
|
||||
@@ -593,7 +607,8 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
|
||||
let dhSecret = C.dh' dhPubKey srvDhPrivKey
|
||||
tknId <- getId
|
||||
regCode <- getRegCode
|
||||
tkn <- atomically $ mkNtfTknData tknId newTkn ks dhSecret regCode
|
||||
ts <- liftIO $ getSystemDate
|
||||
tkn <- liftIO $ mkNtfTknData tknId newTkn ks dhSecret regCode ts
|
||||
atomically $ addNtfToken st tknId tkn
|
||||
atomically $ writeTBQueue pushQ (tkn, PNVerification regCode)
|
||||
withNtfLog (`logCreateToken` tkn)
|
||||
|
||||
@@ -159,7 +159,7 @@ data NtfRequest
|
||||
| NtfReqPing CorrId NtfEntityId
|
||||
|
||||
data NtfServerClient = NtfServerClient
|
||||
{ rcvQ :: TBQueue (NonEmpty NtfRequest),
|
||||
{ rcvQ :: TBQueue (NonEmpty (Maybe NtfTknData, NtfRequest)),
|
||||
sndQ :: TBQueue (NonEmpty (Transmission NtfResponse)),
|
||||
ntfThParams :: THandleParams NTFVersion 'TServer,
|
||||
connected :: TVar Bool,
|
||||
|
||||
@@ -308,7 +308,7 @@ data PushProviderError
|
||||
= PPConnection HTTP2ClientError
|
||||
| PPCryptoError C.CryptoError
|
||||
| PPResponseError (Maybe Status) Text
|
||||
| PPTokenInvalid
|
||||
| PPTokenInvalid NTInvalidReason
|
||||
| PPRetryLater
|
||||
| PPPermanentError
|
||||
deriving (Show, Exception)
|
||||
@@ -338,15 +338,15 @@ apnsPushProviderClient c@APNSPushClient {nonceDrg, apnsCfg} tkn@NtfTknData {toke
|
||||
| status == Just N.ok200 = pure ()
|
||||
| status == Just N.badRequest400 =
|
||||
case reason' of
|
||||
"BadDeviceToken" -> throwE PPTokenInvalid
|
||||
"DeviceTokenNotForTopic" -> throwE PPTokenInvalid
|
||||
"BadDeviceToken" -> throwE $ PPTokenInvalid NTIRBadToken
|
||||
"DeviceTokenNotForTopic" -> throwE $ PPTokenInvalid NTIRTokenNotForTopic
|
||||
"TopicDisallowed" -> throwE PPPermanentError
|
||||
_ -> err status reason'
|
||||
| status == Just N.forbidden403 = case reason' of
|
||||
"ExpiredProviderToken" -> throwE PPPermanentError -- there should be no point retrying it as the token was refreshed
|
||||
"InvalidProviderToken" -> throwE PPPermanentError
|
||||
_ -> err status reason'
|
||||
| status == Just N.gone410 = throwE PPTokenInvalid
|
||||
| status == Just N.gone410 = throwE $ PPTokenInvalid NTIRGone410
|
||||
| status == Just N.serviceUnavailable503 = liftIO (disconnectApnsHTTP2Client c) >> throwE PPRetryLater
|
||||
-- Just tooManyRequests429 -> TooManyRequests - too many requests for the same token
|
||||
| otherwise = err status reason'
|
||||
|
||||
@@ -25,6 +25,7 @@ import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Protocol (NtfPrivateAuthKey, NtfPublicAuthKey, SMPServer)
|
||||
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (whenM, ($>>=))
|
||||
@@ -57,14 +58,16 @@ data NtfTknData = NtfTknData
|
||||
tknDhKeys :: C.KeyPair 'C.X25519,
|
||||
tknDhSecret :: C.DhSecretX25519,
|
||||
tknRegCode :: NtfRegCode,
|
||||
tknCronInterval :: TVar Word16
|
||||
tknCronInterval :: TVar Word16,
|
||||
tknUpdatedAt :: TVar (Maybe RoundedSystemTime)
|
||||
}
|
||||
|
||||
mkNtfTknData :: NtfTokenId -> NewNtfEntity 'Token -> C.KeyPair 'C.X25519 -> C.DhSecretX25519 -> NtfRegCode -> STM NtfTknData
|
||||
mkNtfTknData ntfTknId (NewNtfTkn token tknVerifyKey _) tknDhKeys tknDhSecret tknRegCode = do
|
||||
tknStatus <- newTVar NTRegistered
|
||||
tknCronInterval <- newTVar 0
|
||||
pure NtfTknData {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval}
|
||||
mkNtfTknData :: NtfTokenId -> NewNtfEntity 'Token -> C.KeyPair 'C.X25519 -> C.DhSecretX25519 -> NtfRegCode -> RoundedSystemTime -> IO NtfTknData
|
||||
mkNtfTknData ntfTknId (NewNtfTkn token tknVerifyKey _) tknDhKeys tknDhSecret tknRegCode ts = do
|
||||
tknStatus <- newTVarIO NTRegistered
|
||||
tknCronInterval <- newTVarIO 0
|
||||
tknUpdatedAt <- newTVarIO $ Just ts
|
||||
pure NtfTknData {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval, tknUpdatedAt}
|
||||
|
||||
data NtfSubData = NtfSubData
|
||||
{ ntfSubId :: NtfSubscriptionId,
|
||||
@@ -156,9 +159,8 @@ deleteTokenSubs st tknId = do
|
||||
$>>= \NtfSubData {smpQueue} ->
|
||||
TM.delete smpQueue (subscriptionLookup st) $> Just smpQueue
|
||||
|
||||
getNtfSubscription :: NtfStore -> NtfSubscriptionId -> STM (Maybe NtfSubData)
|
||||
getNtfSubscription st subId =
|
||||
TM.lookup subId (subscriptions st)
|
||||
getNtfSubscriptionIO :: NtfStore -> NtfSubscriptionId -> IO (Maybe NtfSubData)
|
||||
getNtfSubscriptionIO st subId = TM.lookupIO subId (subscriptions st)
|
||||
|
||||
findNtfSubscription :: NtfStore -> SMPQueueNtf -> STM (Maybe NtfSubData)
|
||||
findNtfSubscription st smpQueue = do
|
||||
|
||||
@@ -16,6 +16,7 @@ module Simplex.Messaging.Notifications.Server.StoreLog
|
||||
logUpdateToken,
|
||||
logTokenCron,
|
||||
logDeleteToken,
|
||||
logUpdateTokenTime,
|
||||
logCreateSubscription,
|
||||
logSubscriptionStatus,
|
||||
logDeleteSubscription,
|
||||
@@ -23,6 +24,7 @@ module Simplex.Messaging.Notifications.Server.StoreLog
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Applicative (optional)
|
||||
import Control.Concurrent.STM
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
@@ -36,6 +38,7 @@ import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Notifications.Server.Store
|
||||
import Simplex.Messaging.Protocol (NtfPrivateAuthKey)
|
||||
import Simplex.Messaging.Server.QueueStore (RoundedSystemTime)
|
||||
import Simplex.Messaging.Server.StoreLog
|
||||
import Simplex.Messaging.Util (safeDecodeUtf8)
|
||||
import System.IO
|
||||
@@ -46,6 +49,7 @@ data NtfStoreLogRecord
|
||||
| UpdateToken NtfTokenId DeviceToken NtfRegCode
|
||||
| TokenCron NtfTokenId Word16
|
||||
| DeleteToken NtfTokenId
|
||||
| UpdateTokenTime NtfTokenId RoundedSystemTime
|
||||
| CreateSubscription NtfSubRec
|
||||
| SubscriptionStatus NtfSubscriptionId NtfSubStatus
|
||||
| DeleteSubscription NtfSubscriptionId
|
||||
@@ -59,21 +63,24 @@ data NtfTknRec = NtfTknRec
|
||||
tknDhKeys :: C.KeyPair 'C.X25519,
|
||||
tknDhSecret :: C.DhSecretX25519,
|
||||
tknRegCode :: NtfRegCode,
|
||||
tknCronInterval :: Word16
|
||||
tknCronInterval :: Word16,
|
||||
tknUpdatedAt :: Maybe RoundedSystemTime
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
mkTknData :: NtfTknRec -> STM NtfTknData
|
||||
mkTknData NtfTknRec {ntfTknId, token, tknStatus = status, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval = cronInt} = do
|
||||
tknStatus <- newTVar status
|
||||
tknCronInterval <- newTVar cronInt
|
||||
pure NtfTknData {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval}
|
||||
mkTknData :: NtfTknRec -> IO NtfTknData
|
||||
mkTknData NtfTknRec {ntfTknId, token, tknStatus = status, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval = cronInt, tknUpdatedAt = updatedAt} = do
|
||||
tknStatus <- newTVarIO status
|
||||
tknCronInterval <- newTVarIO cronInt
|
||||
tknUpdatedAt <- newTVarIO updatedAt
|
||||
pure NtfTknData {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval, tknUpdatedAt}
|
||||
|
||||
mkTknRec :: NtfTknData -> STM NtfTknRec
|
||||
mkTknRec NtfTknData {ntfTknId, token, tknStatus = status, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval = cronInt} = do
|
||||
tknStatus <- readTVar status
|
||||
tknCronInterval <- readTVar cronInt
|
||||
pure NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval}
|
||||
mkTknRec :: NtfTknData -> IO NtfTknRec
|
||||
mkTknRec NtfTknData {ntfTknId, token, tknStatus = status, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval = cronInt, tknUpdatedAt = updatedAt} = do
|
||||
tknStatus <- readTVarIO status
|
||||
tknCronInterval <- readTVarIO cronInt
|
||||
tknUpdatedAt <- readTVarIO updatedAt
|
||||
pure NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval, tknUpdatedAt}
|
||||
|
||||
data NtfSubRec = NtfSubRec
|
||||
{ ntfSubId :: NtfSubscriptionId,
|
||||
@@ -84,9 +91,9 @@ data NtfSubRec = NtfSubRec
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
mkSubData :: NtfSubRec -> STM NtfSubData
|
||||
mkSubData :: NtfSubRec -> IO NtfSubData
|
||||
mkSubData NtfSubRec {ntfSubId, smpQueue, notifierKey, tokenId, subStatus = status} = do
|
||||
subStatus <- newTVar status
|
||||
subStatus <- newTVarIO status
|
||||
pure NtfSubData {ntfSubId, smpQueue, notifierKey, tokenId, subStatus}
|
||||
|
||||
mkSubRec :: NtfSubData -> STM NtfSubRec
|
||||
@@ -101,6 +108,7 @@ instance StrEncoding NtfStoreLogRecord where
|
||||
UpdateToken tknId token regCode -> strEncode (Str "TUPDATE", tknId, token, regCode)
|
||||
TokenCron tknId cronInt -> strEncode (Str "TCRON", tknId, cronInt)
|
||||
DeleteToken tknId -> strEncode (Str "TDELETE", tknId)
|
||||
UpdateTokenTime tknId ts -> strEncode (Str "TTIME", tknId, ts)
|
||||
CreateSubscription subRec -> strEncode (Str "SCREATE", subRec)
|
||||
SubscriptionStatus subId subStatus -> strEncode (Str "SSTATUS", subId, subStatus)
|
||||
DeleteSubscription subId -> strEncode (Str "SDELETE", subId)
|
||||
@@ -111,13 +119,14 @@ instance StrEncoding NtfStoreLogRecord where
|
||||
"TUPDATE " *> (UpdateToken <$> strP_ <*> strP_ <*> strP),
|
||||
"TCRON " *> (TokenCron <$> strP_ <*> strP),
|
||||
"TDELETE " *> (DeleteToken <$> strP),
|
||||
"TTIME " *> (UpdateTokenTime <$> strP_ <*> strP),
|
||||
"SCREATE " *> (CreateSubscription <$> strP),
|
||||
"SSTATUS " *> (SubscriptionStatus <$> strP_ <*> strP),
|
||||
"SDELETE " *> (DeleteSubscription <$> strP)
|
||||
]
|
||||
|
||||
instance StrEncoding NtfTknRec where
|
||||
strEncode NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval} =
|
||||
strEncode NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval, tknUpdatedAt} =
|
||||
B.unwords
|
||||
[ "tknId=" <> strEncode ntfTknId,
|
||||
"token=" <> strEncode token,
|
||||
@@ -128,6 +137,9 @@ instance StrEncoding NtfTknRec where
|
||||
"regCode=" <> strEncode tknRegCode,
|
||||
"cron=" <> strEncode tknCronInterval
|
||||
]
|
||||
<> maybe "" updatedAtStr tknUpdatedAt
|
||||
where
|
||||
updatedAtStr t = " updatedAt=" <> strEncode t
|
||||
strP = do
|
||||
ntfTknId <- "tknId=" *> strP_
|
||||
token <- "token=" *> strP_
|
||||
@@ -137,7 +149,8 @@ instance StrEncoding NtfTknRec where
|
||||
tknDhSecret <- "dhSecret=" *> strP_
|
||||
tknRegCode <- "regCode=" *> strP_
|
||||
tknCronInterval <- "cron=" *> strP
|
||||
pure NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval}
|
||||
tknUpdatedAt <- optional $ " updatedAt=" *> strP
|
||||
pure NtfTknRec {ntfTknId, token, tknStatus, tknVerifyKey, tknDhKeys, tknDhSecret, tknRegCode, tknCronInterval, tknUpdatedAt}
|
||||
|
||||
instance StrEncoding NtfSubRec where
|
||||
strEncode NtfSubRec {ntfSubId, smpQueue, notifierKey, tokenId, subStatus} =
|
||||
@@ -161,7 +174,7 @@ logNtfStoreRecord = writeStoreLogRecord
|
||||
{-# INLINE logNtfStoreRecord #-}
|
||||
|
||||
logCreateToken :: StoreLog 'WriteMode -> NtfTknData -> IO ()
|
||||
logCreateToken s tkn = logNtfStoreRecord s . CreateToken =<< atomically (mkTknRec tkn)
|
||||
logCreateToken s tkn = logNtfStoreRecord s . CreateToken =<< mkTknRec tkn
|
||||
|
||||
logTokenStatus :: StoreLog 'WriteMode -> NtfTokenId -> NtfTknStatus -> IO ()
|
||||
logTokenStatus s tknId tknStatus = logNtfStoreRecord s $ TokenStatus tknId tknStatus
|
||||
@@ -175,6 +188,9 @@ logTokenCron s tknId cronInt = logNtfStoreRecord s $ TokenCron tknId cronInt
|
||||
logDeleteToken :: StoreLog 'WriteMode -> NtfTokenId -> IO ()
|
||||
logDeleteToken s tknId = logNtfStoreRecord s $ DeleteToken tknId
|
||||
|
||||
logUpdateTokenTime :: StoreLog 'WriteMode -> NtfTokenId -> RoundedSystemTime -> IO ()
|
||||
logUpdateTokenTime s tknId t = logNtfStoreRecord s $ UpdateTokenTime tknId t
|
||||
|
||||
logCreateSubscription :: StoreLog 'WriteMode -> NtfSubData -> IO ()
|
||||
logCreateSubscription s sub = logNtfStoreRecord s . CreateSubscription =<< atomically (mkSubRec sub)
|
||||
|
||||
@@ -192,36 +208,39 @@ readNtfStore f st = mapM_ (addNtfLogRecord . LB.toStrict) . LB.lines =<< LB.read
|
||||
where
|
||||
addNtfLogRecord s = case strDecode s of
|
||||
Left e -> logError $ "Log parsing error (" <> T.pack e <> "): " <> safeDecodeUtf8 (B.take 100 s)
|
||||
Right lr -> atomically $ case lr of
|
||||
Right lr -> case lr of
|
||||
CreateToken r@NtfTknRec {ntfTknId} -> do
|
||||
tkn <- mkTknData r
|
||||
addNtfToken st ntfTknId tkn
|
||||
atomically $ addNtfToken st ntfTknId tkn
|
||||
TokenStatus tknId status -> do
|
||||
tkn_ <- getNtfToken st tknId
|
||||
tkn_ <- getNtfTokenIO st tknId
|
||||
forM_ tkn_ $ \tkn@NtfTknData {tknStatus} -> do
|
||||
writeTVar tknStatus status
|
||||
when (status == NTActive) $ void $ removeInactiveTokenRegistrations st tkn
|
||||
UpdateToken tknId token' tknRegCode ->
|
||||
getNtfToken st tknId
|
||||
atomically $ writeTVar tknStatus status
|
||||
when (status == NTActive) $ void $ atomically $ removeInactiveTokenRegistrations st tkn
|
||||
UpdateToken tknId token' tknRegCode -> do
|
||||
getNtfTokenIO st tknId
|
||||
>>= mapM_
|
||||
( \tkn@NtfTknData {tknStatus} -> do
|
||||
removeTokenRegistration st tkn
|
||||
writeTVar tknStatus NTRegistered
|
||||
addNtfToken st tknId tkn {token = token', tknRegCode}
|
||||
atomically $ removeTokenRegistration st tkn
|
||||
atomically $ writeTVar tknStatus NTRegistered
|
||||
atomically $ addNtfToken st tknId tkn {token = token', tknRegCode}
|
||||
)
|
||||
TokenCron tknId cronInt ->
|
||||
getNtfToken st tknId
|
||||
>>= mapM_ (\NtfTknData {tknCronInterval} -> writeTVar tknCronInterval cronInt)
|
||||
getNtfTokenIO st tknId
|
||||
>>= mapM_ (\NtfTknData {tknCronInterval} -> atomically $ writeTVar tknCronInterval cronInt)
|
||||
DeleteToken tknId ->
|
||||
void $ deleteNtfToken st tknId
|
||||
atomically $ void $ deleteNtfToken st tknId
|
||||
UpdateTokenTime tknId t ->
|
||||
getNtfTokenIO st tknId
|
||||
>>= mapM_ (\NtfTknData {tknUpdatedAt} -> atomically $ writeTVar tknUpdatedAt $ Just t)
|
||||
CreateSubscription r@NtfSubRec {ntfSubId} -> do
|
||||
sub <- mkSubData r
|
||||
void $ addNtfSubscription st ntfSubId sub
|
||||
SubscriptionStatus subId status ->
|
||||
getNtfSubscription st subId
|
||||
>>= mapM_ (\NtfSubData {subStatus} -> writeTVar subStatus status)
|
||||
void $ atomically $ addNtfSubscription st ntfSubId sub
|
||||
SubscriptionStatus subId status -> do
|
||||
getNtfSubscriptionIO st subId
|
||||
>>= mapM_ (\NtfSubData {subStatus} -> atomically $ writeTVar subStatus status)
|
||||
DeleteSubscription subId ->
|
||||
deleteNtfSubscription st subId
|
||||
atomically $ deleteNtfSubscription st subId
|
||||
|
||||
writeNtfStore :: StoreLog 'WriteMode -> NtfStore -> IO ()
|
||||
writeNtfStore s NtfStore {tokens, subscriptions} = do
|
||||
|
||||
@@ -44,11 +44,14 @@ initialNTFVersion = VersionNTF 1
|
||||
authBatchCmdsNTFVersion :: VersionNTF
|
||||
authBatchCmdsNTFVersion = VersionNTF 2
|
||||
|
||||
invalidReasonNTFVersion :: VersionNTF
|
||||
invalidReasonNTFVersion = VersionNTF 3
|
||||
|
||||
currentClientNTFVersion :: VersionNTF
|
||||
currentClientNTFVersion = VersionNTF 2
|
||||
currentClientNTFVersion = VersionNTF 3
|
||||
|
||||
currentServerNTFVersion :: VersionNTF
|
||||
currentServerNTFVersion = VersionNTF 2
|
||||
currentServerNTFVersion = VersionNTF 3
|
||||
|
||||
supportedClientNTFVRange :: VersionRangeNTF
|
||||
supportedClientNTFVRange = mkVersionRange initialNTFVersion currentClientNTFVersion
|
||||
|
||||
Reference in New Issue
Block a user