mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-29 23:10:00 +00:00
get multiple messages when notification is processed (#411)
* get multiple messages when notification is processed * change notification property
This commit is contained in:
committed by
GitHub
parent
d1db7d6f79
commit
0ab44b1836
@@ -98,10 +98,10 @@ import Simplex.Messaging.Notifications.Client
|
||||
import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfRegCode), NtfTknStatus (..))
|
||||
import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..))
|
||||
import Simplex.Messaging.Parsers (parse)
|
||||
import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, NMsgMeta (..))
|
||||
import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, SMPMsgMeta)
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (bshow, liftE, liftError, tryError, unlessM, whenM, ($>>=))
|
||||
import Simplex.Messaging.Util (bshow, eitherToMaybe, liftE, liftError, tryError, unlessM, whenM, ($>>=))
|
||||
import Simplex.Messaging.Version
|
||||
import System.Random (randomR)
|
||||
import UnliftIO.Async (async, race_)
|
||||
@@ -155,11 +155,11 @@ subscribeConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m ()
|
||||
subscribeConnection c = withAgentEnv c . subscribeConnection' c
|
||||
|
||||
-- | Get connection message (GET command)
|
||||
getConnectionMessage :: AgentErrorMonad m => AgentClient -> ConnId -> m (Maybe (SMP.MsgId, MsgFlags))
|
||||
getConnectionMessage :: AgentErrorMonad m => AgentClient -> ConnId -> m (Maybe SMPMsgMeta)
|
||||
getConnectionMessage c = withAgentEnv c . getConnectionMessage' c
|
||||
|
||||
-- | Get connection message for received notification
|
||||
getNotificationMessage :: AgentErrorMonad m => AgentClient -> ByteString -> C.CbNonce -> m (Maybe (SMP.MsgId, MsgFlags))
|
||||
getNotificationMessage :: AgentErrorMonad m => AgentClient -> C.CbNonce -> ByteString -> m (NotificationInfo, [SMPMsgMeta])
|
||||
getNotificationMessage c = withAgentEnv c .: getNotificationMessage' c
|
||||
|
||||
resubscribeConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m ()
|
||||
@@ -386,7 +386,7 @@ resubscribeConnection' c connId =
|
||||
(atomically $ hasActiveSubscription c connId)
|
||||
(subscribeConnection' c connId)
|
||||
|
||||
getConnectionMessage' :: AgentMonad m => AgentClient -> ConnId -> m (Maybe (SMP.MsgId, MsgFlags))
|
||||
getConnectionMessage' :: AgentMonad m => AgentClient -> ConnId -> m (Maybe SMPMsgMeta)
|
||||
getConnectionMessage' c connId = do
|
||||
whenM (atomically $ hasActiveSubscription c connId) . throwError $ CMD PROHIBITED
|
||||
withStore c (`getConn` connId) >>= \case
|
||||
@@ -395,20 +395,32 @@ getConnectionMessage' c connId = do
|
||||
SomeConn _ ContactConnection {} -> throwError $ CMD PROHIBITED
|
||||
SomeConn _ SndConnection {} -> throwError $ CONN SIMPLEX
|
||||
|
||||
getNotificationMessage' :: forall m. AgentMonad m => AgentClient -> ByteString -> C.CbNonce -> m (Maybe (SMP.MsgId, MsgFlags))
|
||||
getNotificationMessage' c encMessageInfo nonce = do
|
||||
getNotificationMessage' :: forall m. AgentMonad m => AgentClient -> C.CbNonce -> ByteString -> m (NotificationInfo, [SMPMsgMeta])
|
||||
getNotificationMessage' c nonce encNtfInfo = do
|
||||
withStore' c getActiveNtfToken >>= \case
|
||||
Just NtfToken {ntfDhSecret = Just dhSecret} -> do
|
||||
ntfData <- agentCbDecrypt dhSecret nonce encMessageInfo
|
||||
ntfData <- agentCbDecrypt dhSecret nonce encNtfInfo
|
||||
PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} <- liftEither (parse strP (INTERNAL "error parsing PNMessageData") ntfData)
|
||||
(connId, rcvDhSecret) <- withStore c (`getNtfRcvQueue` smpQueue)
|
||||
nMsgMeta <- agentCbDecrypt rcvDhSecret nmsgNonce encNMsgMeta `catchError` const (pure "")
|
||||
let nMsgMetaParsed = parse smpP (INTERNAL "error parsing NMsgMeta") nMsgMeta
|
||||
case nMsgMetaParsed of
|
||||
Right NMsgMeta {msgId, msgTs} -> liftIO . print $ "getNotificationMessage', ntfTs = " <> show ntfTs <> ", msgId = " <> show msgId <> ", msgTs = " <> show msgTs
|
||||
Left _ -> liftIO . print $ "getNotificationMessage', ntfTs = " <> show ntfTs <> ", failed to parse NMsgMeta"
|
||||
getConnectionMessage' c connId
|
||||
(ntfConnId, rcvDhSecret) <- withStore c (`getNtfRcvQueue` smpQueue)
|
||||
ntfMsgMeta <- (eitherToMaybe . smpDecode <$> agentCbDecrypt rcvDhSecret nmsgNonce encNMsgMeta) `catchError` \_ -> pure Nothing
|
||||
maxMsgs <- asks $ ntfMaxMessages . config
|
||||
(NotificationInfo {ntfConnId, ntfTs, ntfMsgMeta},) <$> getNtfMessages ntfConnId maxMsgs ntfMsgMeta []
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
where
|
||||
getNtfMessages ntfConnId maxMs nMeta ms
|
||||
| length ms < maxMs =
|
||||
getConnectionMessage' c ntfConnId >>= \case
|
||||
Just m@SMP.SMPMsgMeta {msgId, msgTs, msgFlags} -> case nMeta of
|
||||
Just SMP.NMsgMeta {msgId = msgId', msgTs = msgTs'}
|
||||
| msgId == msgId' || msgTs > msgTs' -> pure $ reverse (m : ms)
|
||||
| otherwise -> getMsg (m : ms)
|
||||
_
|
||||
| SMP.notification msgFlags -> pure $ reverse (m : ms)
|
||||
| otherwise -> getMsg (m : ms)
|
||||
_ -> pure $ reverse ms
|
||||
| otherwise = pure $ reverse ms
|
||||
where
|
||||
getMsg = getNtfMessages ntfConnId maxMs nMeta
|
||||
|
||||
-- | Send message to the connection (SEND command) in Reader monad
|
||||
sendMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> MsgFlags -> MsgBody -> m AgentMsgId
|
||||
|
||||
@@ -85,7 +85,7 @@ import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Notifications.Client
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Protocol (BrokerMsg, ErrorType, MsgFlags (..), MsgId, NotifierId, NtfPrivateSignKey, NtfPublicVerifyKey, ProtocolServer (..), QueueId, QueueIdsKeys (..), SndPublicVerifyKey)
|
||||
import Simplex.Messaging.Protocol (BrokerMsg, ErrorType, MsgFlags (..), MsgId, NotifierId, NtfPrivateSignKey, NtfPublicVerifyKey, ProtocolServer (..), QueueId, QueueIdsKeys (..), SMPMsgMeta, SndPublicVerifyKey)
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
@@ -508,7 +508,7 @@ sendInvitation c (Compatible SMPQueueInfo {smpServer, senderId, dhPublicKey}) co
|
||||
agentCbEncryptOnce dhPublicKey . smpEncode $
|
||||
SMP.ClientMessage SMP.PHEmpty $ smpEncode agentEnvelope
|
||||
|
||||
getQueueMessage :: AgentMonad m => AgentClient -> RcvQueue -> ConnId -> m (Maybe (MsgId, MsgFlags))
|
||||
getQueueMessage :: AgentMonad m => AgentClient -> RcvQueue -> ConnId -> m (Maybe SMPMsgMeta)
|
||||
getQueueMessage c@AgentClient {getMsgLocks} RcvQueue {server, rcvId, rcvPrivateKey} connId =
|
||||
E.bracket (atomically createTakeLock) (atomically . (`putTMVar` ())) $ \_ ->
|
||||
withLogClient c server rcvId "GET" $ \smp ->
|
||||
|
||||
@@ -71,6 +71,7 @@ data AgentConfig = AgentConfig
|
||||
resubscriptionConcurrency :: Int,
|
||||
ntfWorkerThrottle :: Int,
|
||||
ntfSubCheckInterval :: NominalDiffTime,
|
||||
ntfMaxMessages :: Int,
|
||||
caCertificateFile :: FilePath,
|
||||
privateKeyFile :: FilePath,
|
||||
certificateFile :: FilePath,
|
||||
@@ -104,6 +105,7 @@ defaultAgentConfig =
|
||||
resubscriptionConcurrency = 16,
|
||||
ntfWorkerThrottle = 1000000, -- microseconds
|
||||
ntfSubCheckInterval = nominalDay,
|
||||
ntfMaxMessages = 4,
|
||||
-- CA certificate private key is not needed for initialization
|
||||
-- ! we do not generate these
|
||||
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",
|
||||
|
||||
@@ -83,6 +83,7 @@ module Simplex.Messaging.Agent.Protocol
|
||||
ACorrId,
|
||||
AgentMsgId,
|
||||
AgentPhase (..),
|
||||
NotificationInfo (..),
|
||||
|
||||
-- * Encode/decode
|
||||
serializeCommand,
|
||||
@@ -119,6 +120,7 @@ import qualified Data.List.NonEmpty as L
|
||||
import Data.Maybe (isJust)
|
||||
import Data.Text (Text)
|
||||
import Data.Time.Clock (UTCTime)
|
||||
import Data.Time.Clock.System (SystemTime)
|
||||
import Data.Time.ISO8601
|
||||
import Data.Type.Equality
|
||||
import Data.Typeable ()
|
||||
@@ -135,6 +137,7 @@ import Simplex.Messaging.Protocol
|
||||
MsgBody,
|
||||
MsgFlags,
|
||||
MsgId,
|
||||
NMsgMeta,
|
||||
SMPServer,
|
||||
SndPublicVerifyKey,
|
||||
SrvLoc (..),
|
||||
@@ -252,6 +255,17 @@ instance StrEncoding AgentPhase where
|
||||
"SUSPENDED" -> pure APSuspended
|
||||
_ -> fail "bad AgentPhase"
|
||||
|
||||
instance ToJSON AgentPhase where
|
||||
toEncoding = strToJEncoding
|
||||
toJSON = strToJSON
|
||||
|
||||
data NotificationInfo = NotificationInfo
|
||||
{ ntfConnId :: ConnId,
|
||||
ntfTs :: SystemTime,
|
||||
ntfMsgMeta :: Maybe NMsgMeta
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
data ConnectionMode = CMInvitation | CMContact
|
||||
deriving (Eq, Show)
|
||||
|
||||
|
||||
@@ -291,13 +291,13 @@ subscribeSMPQueue c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId =
|
||||
-- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue
|
||||
--
|
||||
-- https://github.covm/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#receive-a-message-from-the-queue
|
||||
getSMPMessage :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT ProtocolClientError IO (Maybe (MsgId, MsgFlags))
|
||||
getSMPMessage :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT ProtocolClientError IO (Maybe SMP.SMPMsgMeta)
|
||||
getSMPMessage c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId =
|
||||
sendSMPCommand c (Just rpKey) rId GET >>= \case
|
||||
OK -> pure Nothing
|
||||
cmd@(MSG msgId _ msgFlags _) -> do
|
||||
cmd@(MSG msgId msgTs msgFlags _) -> do
|
||||
lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ
|
||||
pure $ Just (msgId, msgFlags)
|
||||
pure $ Just SMP.SMPMsgMeta {msgId, msgTs, msgFlags}
|
||||
_ -> throwE PCEUnexpectedResponse
|
||||
|
||||
-- | Subscribe to the SMP queue notifications.
|
||||
|
||||
@@ -272,10 +272,10 @@ apnsNotification :: NtfTknData -> C.CbNonce -> Int -> PushNotification -> Either
|
||||
apnsNotification NtfTknData {tknDhSecret} nonce paddedLen = \case
|
||||
PNVerification (NtfRegCode code) ->
|
||||
encrypt code $ \code' ->
|
||||
apn APNSBackground {contentAvailable = 1} . Just $ J.object ["verification" .= code', "nonce" .= nonce]
|
||||
apn APNSBackground {contentAvailable = 1} . Just $ J.object ["nonce" .= nonce, "verification" .= code']
|
||||
PNMessage pnMessageData ->
|
||||
encrypt (strEncode pnMessageData) $ \ntfData ->
|
||||
apn apnMutableContent . Just $ J.object ["checkMessage" .= ntfData, "nonce" .= nonce]
|
||||
apn apnMutableContent . Just $ J.object ["nonce" .= nonce, "message" .= ntfData]
|
||||
PNAlert text -> Right $ apn (apnAlert $ APNSAlertText text) Nothing
|
||||
PNCheckMessages -> Right $ apn APNSBackground {contentAvailable = 1} . Just $ J.object ["checkMessages" .= True]
|
||||
where
|
||||
|
||||
@@ -79,6 +79,7 @@ module Simplex.Messaging.Protocol
|
||||
MsgId,
|
||||
MsgBody,
|
||||
EncNMsgMeta,
|
||||
SMPMsgMeta (..),
|
||||
NMsgMeta (..),
|
||||
MsgFlags (..),
|
||||
noMsgFlags,
|
||||
@@ -252,10 +253,18 @@ data BrokerMsg where
|
||||
|
||||
type EncNMsgMeta = ByteString
|
||||
|
||||
data SMPMsgMeta = SMPMsgMeta
|
||||
{ msgId :: MsgId,
|
||||
msgTs :: SystemTime,
|
||||
msgFlags :: MsgFlags
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
data NMsgMeta = NMsgMeta
|
||||
{ msgId :: MsgId,
|
||||
msgTs :: SystemTime
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
instance Encoding NMsgMeta where
|
||||
smpEncode NMsgMeta {msgId, msgTs} =
|
||||
@@ -265,8 +274,10 @@ instance Encoding NMsgMeta where
|
||||
(msgId, msgTs, Tail _) <- smpP
|
||||
pure NMsgMeta {msgId, msgTs}
|
||||
|
||||
newtype MsgFlags = MsgFlags {notification :: Bool}
|
||||
deriving (Eq, Show)
|
||||
data MsgFlags = MsgFlags {notification :: Bool}
|
||||
deriving (Eq, Show, Generic)
|
||||
|
||||
instance ToJSON MsgFlags where toEncoding = J.genericToEncoding J.defaultOptions
|
||||
|
||||
instance Encoding MsgFlags where
|
||||
smpEncode MsgFlags {notification} = smpEncode notification
|
||||
|
||||
@@ -27,7 +27,7 @@ import Simplex.Messaging.Agent.Protocol
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Notifications.Server.Push.APNS
|
||||
import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags))
|
||||
import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), SMPMsgMeta (..))
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Transport (ATransport)
|
||||
import Simplex.Messaging.Util (tryE)
|
||||
@@ -189,7 +189,7 @@ testNotificationSubscriptionExistingConnection :: APNSMockServer -> IO ()
|
||||
testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do
|
||||
alice <- getSMPAgentClient agentCfg initAgentServers
|
||||
bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers
|
||||
Right (bobId, checkMessage, nonce) <- runExceptT $ do
|
||||
Right (bobId, nonce, message) <- runExceptT $ do
|
||||
-- establish connection
|
||||
(bobId, qInfo) <- createConnection alice SCMInvitation
|
||||
aliceId <- joinConnection bob qInfo "bob's connInfo"
|
||||
@@ -213,16 +213,16 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do
|
||||
1 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello"
|
||||
get bob ##> ("", aliceId, SENT $ baseId + 1)
|
||||
-- notification
|
||||
(checkMessage, nonce) <- messageNotification apnsQ
|
||||
pure (bobId, checkMessage, nonce)
|
||||
(nonce, message) <- messageNotification apnsQ
|
||||
pure (bobId, nonce, message)
|
||||
|
||||
-- alice client already has subscription for the connection
|
||||
Left (CMD PROHIBITED) <- runExceptT $ getNotificationMessage alice checkMessage nonce
|
||||
Left (CMD PROHIBITED) <- runExceptT $ getNotificationMessage alice nonce message
|
||||
|
||||
-- aliceNtf client doesn't have subscription and is allowed to get notification message
|
||||
aliceNtf <- getSMPAgentClient agentCfg initAgentServers
|
||||
Right () <- runExceptT $ do
|
||||
Just (_msgId, MsgFlags True) <- getNotificationMessage aliceNtf checkMessage nonce
|
||||
(_, [SMPMsgMeta {msgFlags = MsgFlags True}]) <- getNotificationMessage aliceNtf nonce message
|
||||
pure ()
|
||||
disconnectAgentClient aliceNtf
|
||||
|
||||
@@ -299,13 +299,13 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do
|
||||
baseId = 3
|
||||
msgId = subtract baseId
|
||||
|
||||
messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (ByteString, C.CbNonce)
|
||||
messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString)
|
||||
messageNotification apnsQ = do
|
||||
500000 `timeout` atomically (readTBQueue apnsQ) >>= \case
|
||||
Nothing -> error "no notification"
|
||||
Just APNSMockRequest {notification = APNSNotification {aps = APNSMutableContent {}, notificationData = Just ntfData}, sendApnsResponse} -> do
|
||||
checkMessage <- ntfData .-> "checkMessage"
|
||||
nonce <- C.cbNonce <$> ntfData .-> "nonce"
|
||||
message <- ntfData .-> "message"
|
||||
liftIO $ sendApnsResponse APNSRespOk
|
||||
pure (checkMessage, nonce)
|
||||
pure (nonce, message)
|
||||
_ -> error "bad notification"
|
||||
|
||||
@@ -118,9 +118,9 @@ testNotificationSubscription (ATransport t) =
|
||||
-- receive notification
|
||||
APNSMockRequest {notification, sendApnsResponse = send'} <- atomically $ readTBQueue apnsQ
|
||||
let APNSNotification {aps = APNSMutableContent {}, notificationData = Just ntfData'} = notification
|
||||
Right checkMessage = ntfData' .-> "checkMessage"
|
||||
Right nonce' = C.cbNonce <$> ntfData' .-> "nonce"
|
||||
Right ntfDataDecrypted = C.cbDecrypt dhSecret nonce' checkMessage
|
||||
Right message = ntfData' .-> "message"
|
||||
Right ntfDataDecrypted = C.cbDecrypt dhSecret nonce' message
|
||||
Right APNS.PNMessageData {smpQueue = SMPQueueNtf {smpServer, notifierId}, nmsgNonce, encNMsgMeta} =
|
||||
parse strP (AP.INTERNAL "error parsing PNMessageData") ntfDataDecrypted
|
||||
Right nMsgMeta = C.cbDecrypt rcvDhSecret nmsgNonce encNMsgMeta
|
||||
|
||||
Reference in New Issue
Block a user