diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 12441a15d..8ea17b5c1 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -98,10 +98,10 @@ import Simplex.Messaging.Notifications.Client import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfRegCode), NtfTknStatus (..)) import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..)) import Simplex.Messaging.Parsers (parse) -import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, NMsgMeta (..)) +import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags, SMPMsgMeta) import qualified Simplex.Messaging.Protocol as SMP import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (bshow, liftE, liftError, tryError, unlessM, whenM, ($>>=)) +import Simplex.Messaging.Util (bshow, eitherToMaybe, liftE, liftError, tryError, unlessM, whenM, ($>>=)) import Simplex.Messaging.Version import System.Random (randomR) import UnliftIO.Async (async, race_) @@ -155,11 +155,11 @@ subscribeConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m () subscribeConnection c = withAgentEnv c . subscribeConnection' c -- | Get connection message (GET command) -getConnectionMessage :: AgentErrorMonad m => AgentClient -> ConnId -> m (Maybe (SMP.MsgId, MsgFlags)) +getConnectionMessage :: AgentErrorMonad m => AgentClient -> ConnId -> m (Maybe SMPMsgMeta) getConnectionMessage c = withAgentEnv c . getConnectionMessage' c -- | Get connection message for received notification -getNotificationMessage :: AgentErrorMonad m => AgentClient -> ByteString -> C.CbNonce -> m (Maybe (SMP.MsgId, MsgFlags)) +getNotificationMessage :: AgentErrorMonad m => AgentClient -> C.CbNonce -> ByteString -> m (NotificationInfo, [SMPMsgMeta]) getNotificationMessage c = withAgentEnv c .: getNotificationMessage' c resubscribeConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m () @@ -386,7 +386,7 @@ resubscribeConnection' c connId = (atomically $ hasActiveSubscription c connId) (subscribeConnection' c connId) -getConnectionMessage' :: AgentMonad m => AgentClient -> ConnId -> m (Maybe (SMP.MsgId, MsgFlags)) +getConnectionMessage' :: AgentMonad m => AgentClient -> ConnId -> m (Maybe SMPMsgMeta) getConnectionMessage' c connId = do whenM (atomically $ hasActiveSubscription c connId) . throwError $ CMD PROHIBITED withStore c (`getConn` connId) >>= \case @@ -395,20 +395,32 @@ getConnectionMessage' c connId = do SomeConn _ ContactConnection {} -> throwError $ CMD PROHIBITED SomeConn _ SndConnection {} -> throwError $ CONN SIMPLEX -getNotificationMessage' :: forall m. AgentMonad m => AgentClient -> ByteString -> C.CbNonce -> m (Maybe (SMP.MsgId, MsgFlags)) -getNotificationMessage' c encMessageInfo nonce = do +getNotificationMessage' :: forall m. AgentMonad m => AgentClient -> C.CbNonce -> ByteString -> m (NotificationInfo, [SMPMsgMeta]) +getNotificationMessage' c nonce encNtfInfo = do withStore' c getActiveNtfToken >>= \case Just NtfToken {ntfDhSecret = Just dhSecret} -> do - ntfData <- agentCbDecrypt dhSecret nonce encMessageInfo + ntfData <- agentCbDecrypt dhSecret nonce encNtfInfo PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} <- liftEither (parse strP (INTERNAL "error parsing PNMessageData") ntfData) - (connId, rcvDhSecret) <- withStore c (`getNtfRcvQueue` smpQueue) - nMsgMeta <- agentCbDecrypt rcvDhSecret nmsgNonce encNMsgMeta `catchError` const (pure "") - let nMsgMetaParsed = parse smpP (INTERNAL "error parsing NMsgMeta") nMsgMeta - case nMsgMetaParsed of - Right NMsgMeta {msgId, msgTs} -> liftIO . print $ "getNotificationMessage', ntfTs = " <> show ntfTs <> ", msgId = " <> show msgId <> ", msgTs = " <> show msgTs - Left _ -> liftIO . print $ "getNotificationMessage', ntfTs = " <> show ntfTs <> ", failed to parse NMsgMeta" - getConnectionMessage' c connId + (ntfConnId, rcvDhSecret) <- withStore c (`getNtfRcvQueue` smpQueue) + ntfMsgMeta <- (eitherToMaybe . smpDecode <$> agentCbDecrypt rcvDhSecret nmsgNonce encNMsgMeta) `catchError` \_ -> pure Nothing + maxMsgs <- asks $ ntfMaxMessages . config + (NotificationInfo {ntfConnId, ntfTs, ntfMsgMeta},) <$> getNtfMessages ntfConnId maxMsgs ntfMsgMeta [] _ -> throwError $ CMD PROHIBITED + where + getNtfMessages ntfConnId maxMs nMeta ms + | length ms < maxMs = + getConnectionMessage' c ntfConnId >>= \case + Just m@SMP.SMPMsgMeta {msgId, msgTs, msgFlags} -> case nMeta of + Just SMP.NMsgMeta {msgId = msgId', msgTs = msgTs'} + | msgId == msgId' || msgTs > msgTs' -> pure $ reverse (m : ms) + | otherwise -> getMsg (m : ms) + _ + | SMP.notification msgFlags -> pure $ reverse (m : ms) + | otherwise -> getMsg (m : ms) + _ -> pure $ reverse ms + | otherwise = pure $ reverse ms + where + getMsg = getNtfMessages ntfConnId maxMs nMeta -- | Send message to the connection (SEND command) in Reader monad sendMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> MsgFlags -> MsgBody -> m AgentMsgId diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 6248cd321..f69813da5 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -85,7 +85,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Notifications.Client import Simplex.Messaging.Notifications.Protocol -import Simplex.Messaging.Protocol (BrokerMsg, ErrorType, MsgFlags (..), MsgId, NotifierId, NtfPrivateSignKey, NtfPublicVerifyKey, ProtocolServer (..), QueueId, QueueIdsKeys (..), SndPublicVerifyKey) +import Simplex.Messaging.Protocol (BrokerMsg, ErrorType, MsgFlags (..), MsgId, NotifierId, NtfPrivateSignKey, NtfPublicVerifyKey, ProtocolServer (..), QueueId, QueueIdsKeys (..), SMPMsgMeta, SndPublicVerifyKey) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -508,7 +508,7 @@ sendInvitation c (Compatible SMPQueueInfo {smpServer, senderId, dhPublicKey}) co agentCbEncryptOnce dhPublicKey . smpEncode $ SMP.ClientMessage SMP.PHEmpty $ smpEncode agentEnvelope -getQueueMessage :: AgentMonad m => AgentClient -> RcvQueue -> ConnId -> m (Maybe (MsgId, MsgFlags)) +getQueueMessage :: AgentMonad m => AgentClient -> RcvQueue -> ConnId -> m (Maybe SMPMsgMeta) getQueueMessage c@AgentClient {getMsgLocks} RcvQueue {server, rcvId, rcvPrivateKey} connId = E.bracket (atomically createTakeLock) (atomically . (`putTMVar` ())) $ \_ -> withLogClient c server rcvId "GET" $ \smp -> diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index ffd5af8b7..de3c01b33 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -71,6 +71,7 @@ data AgentConfig = AgentConfig resubscriptionConcurrency :: Int, ntfWorkerThrottle :: Int, ntfSubCheckInterval :: NominalDiffTime, + ntfMaxMessages :: Int, caCertificateFile :: FilePath, privateKeyFile :: FilePath, certificateFile :: FilePath, @@ -104,6 +105,7 @@ defaultAgentConfig = resubscriptionConcurrency = 16, ntfWorkerThrottle = 1000000, -- microseconds ntfSubCheckInterval = nominalDay, + ntfMaxMessages = 4, -- CA certificate private key is not needed for initialization -- ! we do not generate these caCertificateFile = "/etc/opt/simplex-agent/ca.crt", diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 4425c83b0..f445f2c03 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -83,6 +83,7 @@ module Simplex.Messaging.Agent.Protocol ACorrId, AgentMsgId, AgentPhase (..), + NotificationInfo (..), -- * Encode/decode serializeCommand, @@ -119,6 +120,7 @@ import qualified Data.List.NonEmpty as L import Data.Maybe (isJust) import Data.Text (Text) import Data.Time.Clock (UTCTime) +import Data.Time.Clock.System (SystemTime) import Data.Time.ISO8601 import Data.Type.Equality import Data.Typeable () @@ -135,6 +137,7 @@ import Simplex.Messaging.Protocol MsgBody, MsgFlags, MsgId, + NMsgMeta, SMPServer, SndPublicVerifyKey, SrvLoc (..), @@ -252,6 +255,17 @@ instance StrEncoding AgentPhase where "SUSPENDED" -> pure APSuspended _ -> fail "bad AgentPhase" +instance ToJSON AgentPhase where + toEncoding = strToJEncoding + toJSON = strToJSON + +data NotificationInfo = NotificationInfo + { ntfConnId :: ConnId, + ntfTs :: SystemTime, + ntfMsgMeta :: Maybe NMsgMeta + } + deriving (Show) + data ConnectionMode = CMInvitation | CMContact deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 92d749091..33bc01529 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -291,13 +291,13 @@ subscribeSMPQueue c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId = -- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue -- -- https://github.covm/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#receive-a-message-from-the-queue -getSMPMessage :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT ProtocolClientError IO (Maybe (MsgId, MsgFlags)) +getSMPMessage :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT ProtocolClientError IO (Maybe SMP.SMPMsgMeta) getSMPMessage c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId = sendSMPCommand c (Just rpKey) rId GET >>= \case OK -> pure Nothing - cmd@(MSG msgId _ msgFlags _) -> do + cmd@(MSG msgId msgTs msgFlags _) -> do lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ - pure $ Just (msgId, msgFlags) + pure $ Just SMP.SMPMsgMeta {msgId, msgTs, msgFlags} _ -> throwE PCEUnexpectedResponse -- | Subscribe to the SMP queue notifications. diff --git a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs index 42c669c95..b2e1e2a45 100644 --- a/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs +++ b/src/Simplex/Messaging/Notifications/Server/Push/APNS.hs @@ -272,10 +272,10 @@ apnsNotification :: NtfTknData -> C.CbNonce -> Int -> PushNotification -> Either apnsNotification NtfTknData {tknDhSecret} nonce paddedLen = \case PNVerification (NtfRegCode code) -> encrypt code $ \code' -> - apn APNSBackground {contentAvailable = 1} . Just $ J.object ["verification" .= code', "nonce" .= nonce] + apn APNSBackground {contentAvailable = 1} . Just $ J.object ["nonce" .= nonce, "verification" .= code'] PNMessage pnMessageData -> encrypt (strEncode pnMessageData) $ \ntfData -> - apn apnMutableContent . Just $ J.object ["checkMessage" .= ntfData, "nonce" .= nonce] + apn apnMutableContent . Just $ J.object ["nonce" .= nonce, "message" .= ntfData] PNAlert text -> Right $ apn (apnAlert $ APNSAlertText text) Nothing PNCheckMessages -> Right $ apn APNSBackground {contentAvailable = 1} . Just $ J.object ["checkMessages" .= True] where diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index dfdb6c52d..79c3e74c0 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -79,6 +79,7 @@ module Simplex.Messaging.Protocol MsgId, MsgBody, EncNMsgMeta, + SMPMsgMeta (..), NMsgMeta (..), MsgFlags (..), noMsgFlags, @@ -252,10 +253,18 @@ data BrokerMsg where type EncNMsgMeta = ByteString +data SMPMsgMeta = SMPMsgMeta + { msgId :: MsgId, + msgTs :: SystemTime, + msgFlags :: MsgFlags + } + deriving (Show) + data NMsgMeta = NMsgMeta { msgId :: MsgId, msgTs :: SystemTime } + deriving (Show) instance Encoding NMsgMeta where smpEncode NMsgMeta {msgId, msgTs} = @@ -265,8 +274,10 @@ instance Encoding NMsgMeta where (msgId, msgTs, Tail _) <- smpP pure NMsgMeta {msgId, msgTs} -newtype MsgFlags = MsgFlags {notification :: Bool} - deriving (Eq, Show) +data MsgFlags = MsgFlags {notification :: Bool} + deriving (Eq, Show, Generic) + +instance ToJSON MsgFlags where toEncoding = J.genericToEncoding J.defaultOptions instance Encoding MsgFlags where smpEncode MsgFlags {notification} = smpEncode notification diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index 03bc0af9b..3666f007f 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -27,7 +27,7 @@ import Simplex.Messaging.Agent.Protocol import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Server.Push.APNS -import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags)) +import Simplex.Messaging.Protocol (ErrorType (AUTH), MsgFlags (MsgFlags), SMPMsgMeta (..)) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Transport (ATransport) import Simplex.Messaging.Util (tryE) @@ -189,7 +189,7 @@ testNotificationSubscriptionExistingConnection :: APNSMockServer -> IO () testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do alice <- getSMPAgentClient agentCfg initAgentServers bob <- getSMPAgentClient agentCfg {dbFile = testDB2} initAgentServers - Right (bobId, checkMessage, nonce) <- runExceptT $ do + Right (bobId, nonce, message) <- runExceptT $ do -- establish connection (bobId, qInfo) <- createConnection alice SCMInvitation aliceId <- joinConnection bob qInfo "bob's connInfo" @@ -213,16 +213,16 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do 1 <- msgId <$> sendMessage bob aliceId (SMP.MsgFlags True) "hello" get bob ##> ("", aliceId, SENT $ baseId + 1) -- notification - (checkMessage, nonce) <- messageNotification apnsQ - pure (bobId, checkMessage, nonce) + (nonce, message) <- messageNotification apnsQ + pure (bobId, nonce, message) -- alice client already has subscription for the connection - Left (CMD PROHIBITED) <- runExceptT $ getNotificationMessage alice checkMessage nonce + Left (CMD PROHIBITED) <- runExceptT $ getNotificationMessage alice nonce message -- aliceNtf client doesn't have subscription and is allowed to get notification message aliceNtf <- getSMPAgentClient agentCfg initAgentServers Right () <- runExceptT $ do - Just (_msgId, MsgFlags True) <- getNotificationMessage aliceNtf checkMessage nonce + (_, [SMPMsgMeta {msgFlags = MsgFlags True}]) <- getNotificationMessage aliceNtf nonce message pure () disconnectAgentClient aliceNtf @@ -299,13 +299,13 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do baseId = 3 msgId = subtract baseId -messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (ByteString, C.CbNonce) +messageNotification :: TBQueue APNSMockRequest -> ExceptT AgentErrorType IO (C.CbNonce, ByteString) messageNotification apnsQ = do 500000 `timeout` atomically (readTBQueue apnsQ) >>= \case Nothing -> error "no notification" Just APNSMockRequest {notification = APNSNotification {aps = APNSMutableContent {}, notificationData = Just ntfData}, sendApnsResponse} -> do - checkMessage <- ntfData .-> "checkMessage" nonce <- C.cbNonce <$> ntfData .-> "nonce" + message <- ntfData .-> "message" liftIO $ sendApnsResponse APNSRespOk - pure (checkMessage, nonce) + pure (nonce, message) _ -> error "bad notification" diff --git a/tests/NtfServerTests.hs b/tests/NtfServerTests.hs index f12bc35c1..f7bf3ea27 100644 --- a/tests/NtfServerTests.hs +++ b/tests/NtfServerTests.hs @@ -118,9 +118,9 @@ testNotificationSubscription (ATransport t) = -- receive notification APNSMockRequest {notification, sendApnsResponse = send'} <- atomically $ readTBQueue apnsQ let APNSNotification {aps = APNSMutableContent {}, notificationData = Just ntfData'} = notification - Right checkMessage = ntfData' .-> "checkMessage" Right nonce' = C.cbNonce <$> ntfData' .-> "nonce" - Right ntfDataDecrypted = C.cbDecrypt dhSecret nonce' checkMessage + Right message = ntfData' .-> "message" + Right ntfDataDecrypted = C.cbDecrypt dhSecret nonce' message Right APNS.PNMessageData {smpQueue = SMPQueueNtf {smpServer, notifierId}, nmsgNonce, encNMsgMeta} = parse strP (AP.INTERNAL "error parsing PNMessageData") ntfDataDecrypted Right nMsgMeta = C.cbDecrypt rcvDhSecret nmsgNonce encNMsgMeta