From 4b3d04bd27822c6c307274c311870660fdf55dbe Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Mon, 6 Jun 2022 12:59:45 +0100 Subject: [PATCH] support message flags visible to SMP server to control notifications (and for any future extensions) (#386) * support stopping and resuming agent (#385) * export agentDbPath * support fully closing and resuming agent * whitespace * clean up * support message flags visible to SMP server to control notifications (and for any future extensions) * simplify message flags encoding * GET command --- rfcs/2022-06-05-smp-notifications.md | 34 +++++ simplexmq.cabal | 1 + src/Simplex/Messaging/Agent.hs | 59 +++++---- src/Simplex/Messaging/Agent/Client.hs | 53 +++++--- src/Simplex/Messaging/Agent/Protocol.hs | 23 ++-- src/Simplex/Messaging/Agent/Store.hs | 14 ++- src/Simplex/Messaging/Agent/Store/SQLite.hs | 31 +++-- .../Agent/Store/SQLite/Migrations.hs | 4 +- .../SQLite/Migrations/M20220605_msg_flags.hs | 12 ++ .../Store/SQLite/Migrations/agent_schema.sql | 1 + src/Simplex/Messaging/Client.hs | 19 +-- src/Simplex/Messaging/Encoding.hs | 52 ++++++++ src/Simplex/Messaging/Encoding/String.hs | 25 ++++ .../Messaging/Notifications/Protocol.hs | 12 +- src/Simplex/Messaging/Notifications/Server.hs | 4 +- .../Notifications/Server/Push/testpush.sh | 3 +- src/Simplex/Messaging/Protocol.hs | 82 ++++++++---- src/Simplex/Messaging/Server.hs | 68 +++++++--- src/Simplex/Messaging/Server/Env/STM.hs | 12 +- src/Simplex/Messaging/Server/MsgStore.hs | 4 +- src/Simplex/Messaging/Server/MsgStore/STM.hs | 5 +- src/Simplex/Messaging/Transport.hs | 2 +- src/Simplex/Messaging/Util.hs | 4 + tests/AgentTests.hs | 60 ++++----- tests/AgentTests/FunctionalAPITests.hs | 17 +-- tests/AgentTests/SQLiteTests.hs | 3 + tests/ServerTests.hs | 119 +++++++++--------- 27 files changed, 491 insertions(+), 232 deletions(-) create mode 100644 rfcs/2022-06-05-smp-notifications.md create mode 100644 src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220605_msg_flags.hs diff --git a/rfcs/2022-06-05-smp-notifications.md b/rfcs/2022-06-05-smp-notifications.md new file mode 100644 index 000000000..4e58189b7 --- /dev/null +++ b/rfcs/2022-06-05-smp-notifications.md @@ -0,0 +1,34 @@ +# SMP protocol changes to support push notifications on iOS + +## Problem + +There are already commands/responses to allow subscriptions to message notifications - NKEY/NID, NSUB/NMSG. These commands will be used by SMP agent (NKEY/NID) and by notification server (NSUB/NMSG) to have message notifications delivered to notification server, so it can forward them to APNS server using device token. + +There are two remaining problems that these commands do not solve. + +1. Receiving the message when notification arrives. + +iOS requires creating a bundled notification service extension (NSE) that runs in isolated container and, if we were to use the existing commands, would have SMP subscription to the same SMP servers as the main app, triggering resubscriptions every time the message reception switches between the app and NSE. That would cause a substantial increase in traffic and battery consumption to the users. + +2. Showing notifications for service messages. + +Users do not expect to see notifications for every single SMP messages - e.g., we currently do not show notifications when messages are edited and deleted, and users do not expect them. NSE requires that for every received push notification there should be some notification shown to the users. So only we would have to show a notification for message deletes and updates, we would have to show it for all service messages - e.g. user accepted file invitation, or file transmission has started, contact profile updates and so on. + +We considered differentiating whether notifications are sent per queue, from the recipient side, so we do not send notifications for file queues. But it seems insufficient, particularly if we add such features as message receipts, status updates, etc. + +## Proposal + +1. To retrieve messages when push notifications arrive, we will add 2 SMP commands: + +- GET: retrieve one message from the connection. Resonse could be either MSG (the same as when MSG is delivered, but with the correlation id) or GMSG (to simplify processing) – TBC during implementation. If message is not available, the response could be ERR NO_MSG +- ACK or GACK: acknowledge that the message was processed by the client and can be removed - TBC which one is better. The response is OK or ERR NO_MSG if there was nothing to acknowledge (same as with ACK now) + +This would allow receiving a single message from the queue without subscription, this way avoiding that the main app is unsubscribed from the queue. + +2. The only way to avoid showing unnecessary notifications (status updates, service messages, etc.) is to avoid sending them. That requires instructing SMP server whether notification should be sent both per queue, from the recipient side, and per message - from the sender side. So the notification would only be sent if the queue has them enabled (via NKEY command) and the sender includes an additional flag in SEND command. The same flag should be included into MSG, so when the message is retrieved with GET command, the client knows, on the agent or chat level (or both), whether this message should have notification shown to the user, and if not - retrieve the next one(s) as well. + +This is a substantial change to SMP protocol, that would require client and server upgrade for notifications to be supported. + +We should consider whether to increase the SMP protocol version number to 2, so that the new clients can connect to the old clients but without notifications, or we could keep the old commands in the protocol and instead of adding flags to the existing commands, create new commands. + +We can also consider making commands extensible so that the new flags can be added (and ignored by parsers if not supported) to at least some existing commands. diff --git a/simplexmq.cabal b/simplexmq.cabal index 14abecc27..c6e50f2ed 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -48,6 +48,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220301_snd_queue_keys Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220322_notifications Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220404_ntf_subscriptions_draft + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220605_msg_flags Simplex.Messaging.Client Simplex.Messaging.Client.Agent Simplex.Messaging.Crypto diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index ee67611a7..a19b1c677 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -35,7 +35,8 @@ module Simplex.Messaging.Agent AgentMonad, AgentErrorMonad, getSMPAgentClient, - disconnectAgentClient, -- used in tests + disconnectAgentClient, + resumeAgentClient, withAgentLock, createConnection, joinConnection, @@ -91,7 +92,7 @@ import Simplex.Messaging.Encoding import Simplex.Messaging.Notifications.Client import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfRegCode), NtfTknStatus (..)) import Simplex.Messaging.Parsers (parse) -import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody) +import Simplex.Messaging.Protocol (BrokerMsg, ErrorType (AUTH), MsgBody, MsgFlags) import qualified Simplex.Messaging.Protocol as SMP import qualified Simplex.Messaging.TMap as TM import Simplex.Messaging.Util (bshow, liftError, tryError, unlessM, ($>>=)) @@ -113,6 +114,9 @@ getSMPAgentClient cfg initServers = newSMPAgentEnv cfg >>= runReaderT runAgent disconnectAgentClient :: MonadUnliftIO m => AgentClient -> m () disconnectAgentClient c = closeAgentClient c >> logConnection c False +resumeAgentClient :: MonadIO m => AgentClient -> m () +resumeAgentClient c = atomically $ writeTVar (active c) True + -- | type AgentErrorMonad m = (MonadUnliftIO m, MonadError AgentErrorType m) @@ -144,8 +148,8 @@ resubscribeConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m () resubscribeConnection c = withAgentEnv c . resubscribeConnection' c -- | Send message to the connection (SEND command) -sendMessage :: AgentErrorMonad m => AgentClient -> ConnId -> MsgBody -> m AgentMsgId -sendMessage c = withAgentEnv c .: sendMessage' c +sendMessage :: AgentErrorMonad m => AgentClient -> ConnId -> MsgFlags -> MsgBody -> m AgentMsgId +sendMessage c = withAgentEnv c .:. sendMessage' c ackMessage :: AgentErrorMonad m => AgentClient -> ConnId -> AgentMsgId -> m () ackMessage c = withAgentEnv c .: ackMessage' c @@ -242,7 +246,7 @@ processCommand c (connId, cmd) = case cmd of ACPT invId ownCInfo -> (,OK) <$> acceptContact' c connId invId ownCInfo RJCT invId -> rejectContact' c connId invId $> (connId, OK) SUB -> subscribeConnection' c connId $> (connId, OK) - SEND msgBody -> (connId,) . MID <$> sendMessage' c connId msgBody + SEND msgFlags msgBody -> (connId,) . MID <$> sendMessage' c connId msgFlags msgBody ACK msgId -> ackMessage' c connId msgId $> (connId, OK) OFF -> suspendConnection' c connId $> (connId, OK) DEL -> deleteConnection' c connId $> (connId, OK) @@ -283,7 +287,7 @@ joinConn c connId (CRInvitationUri (ConnReqUriData _ agentVRange (qUri :| _)) e2 pure connId' tryError (confirmQueue c connId' sq cInfo $ Just e2eSndParams) >>= \case Right _ -> do - void $ enqueueMessage c connId' sq HELLO + void $ enqueueMessage c connId' sq SMP.noMsgFlags HELLO pure connId' Left e -> do -- TODO recovery for failure on network timeout, see rfcs/2022-04-20-smp-conf-timeout-recovery.md @@ -309,7 +313,7 @@ createReplyQueue c connId sq = do let qInfo = toVersionT qUri SMP.smpClientVersion addSubscription c rq connId withStore $ \st -> upgradeSndConnToDuplex st connId rq - void . enqueueMessage c connId sq $ REPLY [qInfo] + void . enqueueMessage c connId sq SMP.noMsgFlags $ REPLY [qInfo] -- | Approve confirmation (LET command) in Reader monad allowConnection' :: AgentMonad m => AgentClient -> ConnId -> ConfirmationId -> ConnInfo -> m () @@ -366,18 +370,18 @@ resubscribeConnection' c connId = (subscribeConnection' c connId) -- | Send message to the connection (SEND command) in Reader monad -sendMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> MsgBody -> m AgentMsgId -sendMessage' c connId msg = +sendMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> MsgFlags -> MsgBody -> m AgentMsgId +sendMessage' c connId msgFlags msg = withStore (`getConn` connId) >>= \case SomeConn _ (DuplexConnection _ _ sq) -> enqueueMsg sq SomeConn _ (SndConnection _ sq) -> enqueueMsg sq _ -> throwError $ CONN SIMPLEX where enqueueMsg :: SndQueue -> m AgentMsgId - enqueueMsg sq = enqueueMessage c connId sq $ A_MSG msg + enqueueMsg sq = enqueueMessage c connId sq msgFlags $ A_MSG msg -enqueueMessage :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> AMessage -> m AgentMsgId -enqueueMessage c connId sq aMessage = do +enqueueMessage :: forall m. AgentMonad m => AgentClient -> ConnId -> SndQueue -> MsgFlags -> AMessage -> m AgentMsgId +enqueueMessage c connId sq msgFlags aMessage = do resumeMsgDelivery c connId sq msgId <- storeSentMsg queuePendingMsgs c connId sq [msgId] @@ -394,7 +398,7 @@ enqueueMessage c connId sq aMessage = do encAgentMessage <- agentRatchetEncrypt connId agentMsgStr e2eEncUserMsgLength let msgBody = smpEncode $ AgentMsgEnvelope {agentVersion = smpAgentVersion, encAgentMessage} msgType = agentMessageType agentMsg - msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgBody, internalHash, prevMsgHash} + msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgFlags, msgBody, internalHash, prevMsgHash} withStore $ \st -> createSndMsg st connId msgData pure internalId @@ -436,9 +440,12 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} connId sq = do withStore (\st -> E.try $ getPendingMsgData st connId msgId) >>= \case Left (e :: E.SomeException) -> notify $ MERR mId (INTERNAL $ show e) - Right (rq_, (msgType, msgBody, internalTs)) -> - withRetryInterval ri $ \loop -> - tryError (send msgType c sq msgBody) >>= \case + Right (rq_, PendingMsgData {msgType, msgBody, msgFlags, internalTs}) -> + withRetryInterval ri $ \loop -> do + resp <- tryError $ case msgType of + AM_CONN_INFO -> sendConfirmation c sq msgBody + _ -> sendAgentMessage c sq msgFlags msgBody + case resp of Left e -> do let err = if msgType == AM_CONN_INFO then ERR e else MERR mId e case e of @@ -468,7 +475,8 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} connId sq = do AM_CONN_INFO -> do withStore $ \st -> setSndQueueStatus st sq Confirmed when (isJust rq_) $ withStore (`removeConfirmations` connId) - void $ enqueueMessage c connId sq HELLO + -- TODO possibly notification flag should be ON for one of the parties, to result in contact connected notification + void $ enqueueMessage c connId sq SMP.noMsgFlags HELLO AM_HELLO_ -> do withStore $ \st -> setSndQueueStatus st sq Active case rq_ of @@ -482,9 +490,6 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} connId sq = do _ -> pure () delMsg msgId where - send = \case - AM_CONN_INFO -> sendConfirmation - _ -> sendAgentMessage delMsg :: InternalId -> m () delMsg msgId = withStore $ \st -> deleteMsg st connId msgId notify :: ACommand 'Agent -> m () @@ -675,7 +680,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) processSMP :: SConnType c -> ConnData -> RcvQueue -> m () processSMP cType ConnData {connId} rq@RcvQueue {rcvDhSecret, e2ePrivKey, e2eDhSecret, status} = case cmd of - SMP.MSG srvMsgId srvTs msgBody' -> handleNotifyAck $ do + SMP.MSG srvMsgId srvTs msgFlags msgBody' -> handleNotifyAck $ do -- TODO deduplicate with previously received msgBody <- agentCbDecrypt rcvDhSecret (C.cbNonce srvMsgId) msgBody' clientMsg@SMP.ClientMsgEnvelope {cmHeader = SMP.PubHeader phVer e2ePubKey_} <- @@ -697,12 +702,12 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) parseMessage agentMsgBody >>= \case agentMsg@(AgentMessage APrivHeader {sndMsgId, prevMsgHash} aMessage) -> do let msgType = agentMessageType agentMsg - (msgId, msgMeta) <- agentClientMsg prevMsgHash sndMsgId (srvMsgId, systemToUTCTime srvTs) agentMsgBody msgType + (msgId, msgMeta) <- agentClientMsg prevMsgHash sndMsgId (srvMsgId, systemToUTCTime srvTs) msgFlags agentMsgBody msgType case aMessage of HELLO -> helloMsg >> ack >> withStore (\st -> deleteMsg st connId msgId) REPLY cReq -> replyMsg cReq >> ack >> withStore (\st -> deleteMsg st connId msgId) -- note that there is no ACK sent here, it is sent with agent's user ACK command - A_MSG body -> notify $ MSG msgMeta body + A_MSG body -> notify $ MSG msgMeta msgFlags body _ -> prohibited >> ack _ -> prohibited >> ack _ -> prohibited >> ack @@ -801,8 +806,8 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) enqueueConfirmation c connId sq ownConnInfo Nothing _ -> prohibited - agentClientMsg :: PrevRcvMsgHash -> ExternalSndId -> (BrokerId, BrokerTs) -> MsgBody -> AgentMessageType -> m (InternalId, MsgMeta) - agentClientMsg externalPrevSndHash sndMsgId broker msgBody msgType = do + agentClientMsg :: PrevRcvMsgHash -> ExternalSndId -> (BrokerId, BrokerTs) -> MsgFlags -> MsgBody -> AgentMessageType -> m (InternalId, MsgMeta) + agentClientMsg externalPrevSndHash sndMsgId broker msgFlags msgBody msgType = do logServer "<--" c srv rId "MSG " let internalHash = C.sha256Hash msgBody internalTs <- liftIO getCurrentTime @@ -810,7 +815,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (srv, sessId, rId, cmd) let integrity = checkMsgIntegrity prevExtSndId sndMsgId prevRcvMsgHash externalPrevSndHash recipient = (unId internalId, internalTs) msgMeta = MsgMeta {integrity, recipient, broker, sndMsgId} - rcvMsg = RcvMsgData {msgMeta, msgType, msgBody, internalRcvId, internalHash, externalPrevSndHash} + rcvMsg = RcvMsgData {msgMeta, msgType, msgFlags, msgBody, internalRcvId, internalHash, externalPrevSndHash} withStore $ \st -> createRcvMsg st connId rcvMsg pure (internalId, msgMeta) @@ -862,7 +867,7 @@ enqueueConfirmation c connId sq connInfo e2eEncryption = do encConnInfo <- agentRatchetEncrypt connId agentMsgStr e2eEncConnInfoLength let msgBody = smpEncode $ AgentConfirmation {agentVersion = smpAgentVersion, e2eEncryption, encConnInfo} msgType = agentMessageType agentMsg - msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgBody, internalHash, prevMsgHash} + msgData = SndMsgData {internalId, internalSndId, internalTs, msgType, msgBody, msgFlags = SMP.noMsgFlags, internalHash, prevMsgHash} withStore $ \st -> createSndMsg st connId msgData pure internalId diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 2d842e3c9..6f186ece2 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -39,6 +39,7 @@ module Simplex.Messaging.Agent.Client logServer, removeSubscription, hasActiveSubscription, + agentDbPath, ) where @@ -63,13 +64,14 @@ import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store +import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..)) import Simplex.Messaging.Client import Simplex.Messaging.Client.Agent () import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding import Simplex.Messaging.Notifications.Client import Simplex.Messaging.Notifications.Protocol -import Simplex.Messaging.Protocol (BrokerMsg, ErrorType, ProtocolServer (..), QueueId, QueueIdsKeys (..), SndPublicVerifyKey) +import Simplex.Messaging.Protocol (BrokerMsg, ErrorType, MsgFlags (..), ProtocolServer (..), QueueId, QueueIdsKeys (..), SndPublicVerifyKey) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -132,6 +134,9 @@ newAgentClient InitialAgentServers {smp, ntf} agentEnv = do lock <- newTMVar () return AgentClient {active, rcvQ, subQ, msgQ, smpServers, ntfServers, smpClients, ntfClients, subscrSrvrs, pendingSubscrSrvrs, subscrConns, connMsgsQueued, smpQueueMsgQueues, smpQueueMsgDeliveries, reconnections, asyncClients, clientId, agentEnv, smpSubscriber = undefined, lock} +agentDbPath :: AgentClient -> FilePath +agentDbPath AgentClient {agentEnv = Env {store = SQLiteStore {dbFilePath}}} = dbFilePath + -- | Agent monad with MonadReader Env and MonadError AgentErrorType type AgentMonad m = (MonadUnliftIO m, MonadReader Env m, MonadError AgentErrorType m) @@ -184,10 +189,11 @@ getSMPServerClient c@AgentClient {active, smpClients, msgQ} srv = do _ -> TM.insert srv cVar ps serverDown :: UnliftIO m -> Map ConnId RcvQueue -> IO () - serverDown u cs = unless (M.null cs) $ do - let conns = M.keys cs - unless (null conns) . notifySub "" $ DOWN srv conns - whenM (readTVarIO active) $ unliftIO u reconnectServer + serverDown u cs = unless (M.null cs) $ + whenM (readTVarIO active) $ do + let conns = M.keys cs + unless (null conns) . notifySub "" $ DOWN srv conns + unliftIO u reconnectServer reconnectServer :: m () reconnectServer = do @@ -304,21 +310,30 @@ newProtocolClient c srv clients connectClient reconnectClient clientVar = tryCon closeAgentClient :: MonadIO m => AgentClient -> m () closeAgentClient c = liftIO $ do atomically $ writeTVar (active c) False - closeSMPServerClients c + closeProtocolServerClients (clientTimeout smpCfg) $ smpClients c + closeProtocolServerClients (clientTimeout ntfCfg) $ ntfClients c cancelActions $ reconnections c cancelActions $ asyncClients c cancelActions $ smpQueueMsgDeliveries c - -closeSMPServerClients :: AgentClient -> IO () -closeSMPServerClients c = readTVarIO (smpClients c) >>= mapM_ (forkIO . closeClient) + clear subscrSrvrs + clear pendingSubscrSrvrs + clear subscrConns + clear connMsgsQueued + clear smpQueueMsgQueues where - closeClient smpVar = - atomically (readTMVar smpVar) >>= \case - Right smp -> closeProtocolClient smp `catchAll_` pure () + clientTimeout sel = tcpTimeout . sel . config $ agentEnv c + clear sel = atomically $ writeTVar (sel c) M.empty + +closeProtocolServerClients :: Int -> TMap ProtocolServer (ClientVar msg) -> IO () +closeProtocolServerClients tcpTimeout cs = readTVarIO cs >>= mapM_ (forkIO . closeClient) >> atomically (writeTVar cs M.empty) + where + closeClient cVar = + tcpTimeout `timeout` atomically (readTMVar cVar) >>= \case + Just (Right client) -> closeProtocolClient client `catchAll_` pure () _ -> pure () -cancelActions :: Foldable f => TVar (f (Async ())) -> IO () -cancelActions as = readTVarIO as >>= mapM_ uninterruptibleCancel +cancelActions :: (Foldable f, Monoid (f (Async ()))) => TVar (f (Async ())) -> IO () +cancelActions as = readTVarIO as >>= mapM_ uninterruptibleCancel >> atomically (writeTVar as mempty) withAgentLock :: MonadUnliftIO m => AgentClient -> m a -> m a withAgentLock AgentClient {lock} = @@ -450,14 +465,14 @@ sendConfirmation c sq@SndQueue {server, sndId, sndPublicKey = Just sndPublicKey, withLogClient_ c server sndId "SEND " $ \smp -> do let clientMsg = SMP.ClientMessage (SMP.PHConfirmation sndPublicKey) agentConfirmation msg <- agentCbEncrypt sq e2ePubKey $ smpEncode clientMsg - liftClient SMP $ sendSMPMessage smp Nothing sndId msg + liftClient SMP $ sendSMPMessage smp Nothing sndId SMP.noMsgFlags msg sendConfirmation _ _ _ = throwError $ INTERNAL "sendConfirmation called without snd_queue public key(s) in the database" sendInvitation :: forall m. AgentMonad m => AgentClient -> Compatible SMPQueueInfo -> ConnectionRequestUri 'CMInvitation -> ConnInfo -> m () sendInvitation c (Compatible SMPQueueInfo {smpServer, senderId, dhPublicKey}) connReq connInfo = withLogClient_ c smpServer senderId "SEND " $ \smp -> do msg <- mkInvitation - liftClient SMP $ sendSMPMessage smp Nothing senderId msg + liftClient SMP $ sendSMPMessage smp Nothing senderId MsgFlags {notification = True} msg where mkInvitation :: m ByteString -- this is only encrypted with per-queue E2E, not with double ratchet @@ -486,12 +501,12 @@ deleteQueue c RcvQueue {server, rcvId, rcvPrivateKey} = withLogClient c server rcvId "DEL" $ \smp -> deleteSMPQueue smp rcvPrivateKey rcvId -sendAgentMessage :: forall m. AgentMonad m => AgentClient -> SndQueue -> ByteString -> m () -sendAgentMessage c sq@SndQueue {server, sndId, sndPrivateKey} agentMsg = +sendAgentMessage :: forall m. AgentMonad m => AgentClient -> SndQueue -> MsgFlags -> ByteString -> m () +sendAgentMessage c sq@SndQueue {server, sndId, sndPrivateKey} msgFlags agentMsg = withLogClient_ c server sndId "SEND " $ \smp -> do let clientMsg = SMP.ClientMessage SMP.PHEmpty agentMsg msg <- agentCbEncrypt sq Nothing $ smpEncode clientMsg - liftClient SMP $ sendSMPMessage smp (Just sndPrivateKey) sndId msg + liftClient SMP $ sendSMPMessage smp (Just sndPrivateKey) sndId msgFlags msg agentNtfRegisterToken :: AgentMonad m => AgentClient -> NtfToken -> C.APublicVerifyKey -> C.PublicKeyX25519 -> m (NtfTokenId, C.PublicKeyX25519) agentNtfRegisterToken c NtfToken {deviceToken, ntfServer, ntfPrivKey} ntfPubKey pubDhKey = diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index 31ce96c3b..bfb4b785a 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -110,7 +110,7 @@ import qualified Data.Attoparsec.ByteString.Char8 as A import Data.ByteString.Base64 import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.Composition ((.:)) +import Data.Composition ((.:), (.:.)) import Data.Functor (($>)) import Data.Int (Int64) import Data.Kind (Type) @@ -132,6 +132,7 @@ import Simplex.Messaging.Parsers import Simplex.Messaging.Protocol ( ErrorType, MsgBody, + MsgFlags, MsgId, SMPServer, SndPublicVerifyKey, @@ -212,11 +213,11 @@ data ACommand (p :: AParty) where END :: ACommand Agent DOWN :: SMPServer -> [ConnId] -> ACommand Agent UP :: SMPServer -> [ConnId] -> ACommand Agent - SEND :: MsgBody -> ACommand Client + SEND :: MsgFlags -> MsgBody -> ACommand Client MID :: AgentMsgId -> ACommand Agent SENT :: AgentMsgId -> ACommand Agent MERR :: AgentMsgId -> AgentErrorType -> ACommand Agent - MSG :: MsgMeta -> MsgBody -> ACommand Agent + MSG :: MsgMeta -> MsgFlags -> MsgBody -> ACommand Agent ACK :: AgentMsgId -> ACommand Client OFF :: ACommand Client DEL :: ACommand Client @@ -840,13 +841,13 @@ commandP = acptCmd = ACmd SClient .: ACPT <$> A.takeTill (== ' ') <* A.space <*> A.takeByteString rjctCmd = ACmd SClient . RJCT <$> A.takeByteString infoCmd = ACmd SAgent . INFO <$> A.takeByteString - downsResp = ACmd SAgent .: DOWN <$> strP <* A.space <*> connections - upsResp = ACmd SAgent .: UP <$> strP <* A.space <*> connections - sendCmd = ACmd SClient . SEND <$> A.takeByteString + downsResp = ACmd SAgent .: DOWN <$> strP_ <*> connections + upsResp = ACmd SAgent .: UP <$> strP_ <*> connections + sendCmd = ACmd SClient .: SEND <$> smpP <* A.space <*> A.takeByteString msgIdResp = ACmd SAgent . MID <$> A.decimal sentResp = ACmd SAgent . SENT <$> A.decimal msgErrResp = ACmd SAgent .: MERR <$> A.decimal <* A.space <*> strP - message = ACmd SAgent .: MSG <$> msgMetaP <* A.space <*> A.takeByteString + message = ACmd SAgent .:. MSG <$> msgMetaP <* A.space <*> smpP <* A.space <*> A.takeByteString ackCmd = ACmd SClient . ACK <$> A.decimal connections = strP `A.sepBy'` (A.char ',') msgMetaP = do @@ -877,11 +878,11 @@ serializeCommand = \case END -> "END" DOWN srv conns -> B.unwords ["DOWN", strEncode srv, connections conns] UP srv conns -> B.unwords ["UP", strEncode srv, connections conns] - SEND msgBody -> "SEND " <> serializeBinary msgBody + SEND msgFlags msgBody -> "SEND " <> smpEncode msgFlags <> " " <> serializeBinary msgBody MID mId -> "MID " <> bshow mId SENT mId -> "SENT " <> bshow mId MERR mId e -> B.unwords ["MERR", bshow mId, strEncode e] - MSG msgMeta msgBody -> B.unwords ["MSG", serializeMsgMeta msgMeta, serializeBinary msgBody] + MSG msgMeta msgFlags msgBody -> B.unwords ["MSG", serializeMsgMeta msgMeta, smpEncode msgFlags, serializeBinary msgBody] ACK mId -> "ACK " <> bshow mId OFF -> "OFF" DEL -> "DEL" @@ -953,8 +954,8 @@ tGet party h = liftIO (tGetRaw h) >>= tParseLoadBody cmdWithMsgBody :: ACommand p -> m (Either AgentErrorType (ACommand p)) cmdWithMsgBody = \case - SEND body -> SEND <$$> getBody body - MSG msgMeta body -> MSG msgMeta <$$> getBody body + SEND msgFlags body -> SEND msgFlags <$$> getBody body + MSG msgMeta msgFlags body -> MSG msgMeta msgFlags <$$> getBody body JOIN qUri cInfo -> JOIN qUri <$$> getBody cInfo CONF confId cInfo -> CONF confId <$$> getBody cInfo LET confId cInfo -> LET confId <$$> getBody cInfo diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index ec564ceb8..34c1579ae 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -24,6 +24,7 @@ import Simplex.Messaging.Notifications.Client import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfTknStatus, NtfTokenId) import Simplex.Messaging.Protocol ( MsgBody, + MsgFlags, MsgId, RcvDhSecret, RcvPrivateSignKey, @@ -64,7 +65,7 @@ class Monad m => MonadAgentStore s m where createRcvMsg :: s -> ConnId -> RcvMsgData -> m () updateSndIds :: s -> ConnId -> m (InternalId, InternalSndId, PrevSndMsgHash) createSndMsg :: s -> ConnId -> SndMsgData -> m () - getPendingMsgData :: s -> ConnId -> InternalId -> m (Maybe RcvQueue, (AgentMessageType, MsgBody, InternalTs)) + getPendingMsgData :: s -> ConnId -> InternalId -> m (Maybe RcvQueue, PendingMsgData) getPendingMsgs :: s -> ConnId -> m [InternalId] checkRcvMsg :: s -> ConnId -> InternalId -> m () deleteMsg :: s -> ConnId -> InternalId -> m () @@ -235,6 +236,7 @@ type PrevSndMsgHash = MsgHash data RcvMsgData = RcvMsgData { msgMeta :: MsgMeta, msgType :: AgentMessageType, + msgFlags :: MsgFlags, msgBody :: MsgBody, internalRcvId :: InternalRcvId, internalHash :: MsgHash, @@ -246,14 +248,18 @@ data SndMsgData = SndMsgData internalSndId :: InternalSndId, internalTs :: InternalTs, msgType :: AgentMessageType, + msgFlags :: MsgFlags, msgBody :: MsgBody, internalHash :: MsgHash, prevMsgHash :: MsgHash } -data PendingMsg = PendingMsg - { connId :: ConnId, - msgId :: InternalId +data PendingMsgData = PendingMsgData + { msgId :: InternalId, + msgType :: AgentMessageType, + msgFlags :: MsgFlags, + msgBody :: MsgBody, + internalTs :: InternalTs } deriving (Show) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 5c4e679b7..ec34ab9a3 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -43,7 +43,7 @@ import qualified Data.Map.Strict as M import Data.Maybe (listToMaybe) import Data.Text (Text) import qualified Data.Text as T -import Data.Text.Encoding (decodeLatin1) +import Data.Text.Encoding (decodeLatin1, encodeUtf8) import Data.Time.Clock (getCurrentTime) import Database.SQLite.Simple (FromRow, NamedParam (..), Only (..), SQLError, ToRow, field, (:.) (..)) import qualified Database.SQLite.Simple as DB @@ -61,9 +61,9 @@ import Simplex.Messaging.Encoding.String import Simplex.Messaging.Notifications.Client (NtfServer, NtfTknAction, NtfToken (..)) import Simplex.Messaging.Notifications.Protocol (DeviceToken (..), NtfTknStatus (..), NtfTokenId) import Simplex.Messaging.Parsers (blobFieldParser, fromTextField_) -import Simplex.Messaging.Protocol (MsgBody, ProtocolServer (..)) +import Simplex.Messaging.Protocol (MsgBody, MsgFlags, ProtocolServer (..)) import qualified Simplex.Messaging.Protocol as SMP -import Simplex.Messaging.Util (bshow, liftIOEither) +import Simplex.Messaging.Util (bshow, eitherToMaybe, liftIOEither) import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist) import System.Exit (exitFailure) import System.FilePath (takeDirectory) @@ -459,22 +459,25 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto insertSndMsgDetails_ db connId sndMsgData updateHashSnd_ db connId sndMsgData - getPendingMsgData :: SQLiteStore -> ConnId -> InternalId -> m (Maybe RcvQueue, (AgentMessageType, MsgBody, InternalTs)) + getPendingMsgData :: SQLiteStore -> ConnId -> InternalId -> m (Maybe RcvQueue, PendingMsgData) getPendingMsgData st connId msgId = liftIOEither . withTransaction st $ \db -> runExceptT $ do rq_ <- liftIO $ getRcvQueueByConnId_ db connId msgData <- - ExceptT . firstRow id SEMsgNotFound $ + ExceptT . firstRow pendingMsgData SEMsgNotFound $ DB.query db [sql| - SELECT m.msg_type, m.msg_body, m.internal_ts + SELECT m.msg_type, m.msg_flags, m.msg_body, m.internal_ts FROM messages m JOIN snd_messages s ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id WHERE m.conn_id = ? AND m.internal_id = ? |] (connId, msgId) pure (rq_, msgData) + where + pendingMsgData :: (AgentMessageType, MsgFlags, MsgBody, InternalTs) -> PendingMsgData + pendingMsgData (msgType, msgFlags, msgBody, internalTs) = PendingMsgData {msgId, msgType, msgFlags, msgBody, internalTs} getPendingMsgs :: SQLiteStore -> ConnId -> m [InternalId] getPendingMsgs st connId = @@ -681,6 +684,10 @@ instance ToField (SConnectionMode c) where toField = toField . connMode instance FromField AConnectionMode where fromField = fromTextField_ $ fmap connMode' . connModeT +instance ToField MsgFlags where toField = toField . decodeLatin1 . smpEncode + +instance FromField MsgFlags where fromField = fromTextField_ $ eitherToMaybe . smpDecode . encodeUtf8 + listToEither :: e -> [a] -> Either e a listToEither _ (x : _) = Right x listToEither e _ = Left e @@ -862,21 +869,22 @@ updateLastIdsRcv_ dbConn connId newInternalId newInternalRcvId = -- * createRcvMsg helpers insertRcvMsgBase_ :: DB.Connection -> ConnId -> RcvMsgData -> IO () -insertRcvMsgBase_ dbConn connId RcvMsgData {msgMeta, msgType, msgBody, internalRcvId} = do +insertRcvMsgBase_ dbConn connId RcvMsgData {msgMeta, msgType, msgFlags, msgBody, internalRcvId} = do let MsgMeta {recipient = (internalId, internalTs)} = msgMeta DB.executeNamed dbConn [sql| INSERT INTO messages - ( conn_id, internal_id, internal_ts, internal_rcv_id, internal_snd_id, msg_type, msg_body) + ( conn_id, internal_id, internal_ts, internal_rcv_id, internal_snd_id, msg_type, msg_flags, msg_body) VALUES - (:conn_id,:internal_id,:internal_ts,:internal_rcv_id, NULL,:msg_type, :msg_body); + (:conn_id,:internal_id,:internal_ts,:internal_rcv_id, NULL,:msg_type,:msg_flags,:msg_body); |] [ ":conn_id" := connId, ":internal_id" := internalId, ":internal_ts" := internalTs, ":internal_rcv_id" := internalRcvId, ":msg_type" := msgType, + ":msg_flags" := msgFlags, ":msg_body" := msgBody ] @@ -962,15 +970,16 @@ insertSndMsgBase_ dbConn connId SndMsgData {..} = do dbConn [sql| INSERT INTO messages - ( conn_id, internal_id, internal_ts, internal_rcv_id, internal_snd_id, msg_type, msg_body) + ( conn_id, internal_id, internal_ts, internal_rcv_id, internal_snd_id, msg_type, msg_flags, msg_body) VALUES - (:conn_id,:internal_id,:internal_ts, NULL,:internal_snd_id,:msg_type, :msg_body); + (:conn_id,:internal_id,:internal_ts, NULL,:internal_snd_id,:msg_type,:msg_flags,:msg_body); |] [ ":conn_id" := connId, ":internal_id" := internalId, ":internal_ts" := internalTs, ":internal_snd_id" := internalSndId, ":msg_type" := msgType, + ":msg_flags" := msgFlags, ":msg_body" := msgBody ] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index 7e7f9f5b4..449c7c340 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -27,6 +27,7 @@ import qualified Database.SQLite3 as SQLite3 import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220101_initial import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220301_snd_queue_keys import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220322_notifications +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220605_msg_flags data Migration = Migration {name :: String, up :: Text} deriving (Show) @@ -35,7 +36,8 @@ schemaMigrations :: [(String, Query)] schemaMigrations = [ ("20220101_initial", m20220101_initial), ("20220301_snd_queue_keys", m20220301_snd_queue_keys), - ("20220322_notifications", m20220322_notifications) + ("20220322_notifications", m20220322_notifications), + ("20220605_msg_flags", m20220605_msg_flags) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220605_msg_flags.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220605_msg_flags.hs new file mode 100644 index 000000000..2b6311a63 --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20220605_msg_flags.hs @@ -0,0 +1,12 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220605_msg_flags where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20220605_msg_flags :: Query +m20220605_msg_flags = + [sql| +ALTER TABLE messages ADD COLUMN msg_flags TEXT NULL; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index d5fbc7718..1e694c87f 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -64,6 +64,7 @@ CREATE TABLE messages( internal_snd_id INTEGER, msg_type BLOB NOT NULL, --(H)ELLO,(R)EPLY,(D)ELETE. Should SMP confirmation be saved too? msg_body BLOB NOT NULL DEFAULT x'', + msg_flags TEXT NULL, PRIMARY KEY(conn_id, internal_id), FOREIGN KEY(conn_id, internal_rcv_id) REFERENCES rcv_messages ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index dbe47d5f6..e87a56e8c 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -70,6 +70,7 @@ import Simplex.Messaging.Transport.Client (runTransportClient) import Simplex.Messaging.Transport.KeepAlive import Simplex.Messaging.Transport.WebSockets (WS) import Simplex.Messaging.Util (bshow, liftError, raceAny_) +import Simplex.Messaging.Version import System.Timeout (timeout) -- | 'SMPClient' is a handle used to send commands to a specific SMP server. @@ -79,6 +80,7 @@ data ProtocolClient msg = ProtocolClient { action :: Async (), connected :: TVar Bool, sessionId :: SessionId, + thVersion :: Version, protocolServer :: ProtocolServer, tcpTimeout :: Int, clientCorrId :: TVar Natural, @@ -146,6 +148,7 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, tcpTimeout, tc ProtocolClient { action = undefined, sessionId = undefined, + thVersion = undefined, connected, protocolServer, tcpTimeout, @@ -165,7 +168,7 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, tcpTimeout, tc `finally` atomically (putTMVar thVar $ Left PCENetworkError) th_ <- tcpTimeout `timeout` atomically (takeTMVar thVar) pure $ case th_ of - Just (Right THandle {sessionId}) -> Right c {action, sessionId} + Just (Right THandle {sessionId, thVersion}) -> Right c {action, sessionId, thVersion} Just (Left e) -> Left e Nothing -> Left PCENetworkError @@ -179,11 +182,11 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, tcpTimeout, tc client _ c thVar h = runExceptT (protocolClientHandshake @msg h $ keyHash protocolServer) >>= \case Left e -> atomically . putTMVar thVar . Left $ PCETransportError e - Right th@THandle {sessionId} -> do + Right th@THandle {sessionId, thVersion} -> do atomically $ do writeTVar (connected c) True putTMVar thVar $ Right th - let c' = c {sessionId} :: ProtocolClient msg + let c' = c {sessionId, thVersion} :: ProtocolClient msg -- TODO remove ping if 0 is passed (or Nothing?) raceAny_ [send c' th, process c', receive c' th, ping c'] `finally` disconnected @@ -305,9 +308,9 @@ enableSMPQueueNotifications c rpKey rId notifierKey = -- | Send SMP message. -- -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#send-message -sendSMPMessage :: SMPClient -> Maybe SndPrivateSignKey -> SenderId -> MsgBody -> ExceptT ProtocolClientError IO () -sendSMPMessage c spKey sId msg = - sendSMPCommand c spKey sId (SEND msg) >>= \case +sendSMPMessage :: SMPClient -> Maybe SndPrivateSignKey -> SenderId -> MsgFlags -> MsgBody -> ExceptT ProtocolClientError IO () +sendSMPMessage c spKey sId flags msg = + sendSMPCommand c spKey sId (SEND flags msg) >>= \case OK -> pure () _ -> throwE PCEUnexpectedResponse @@ -347,9 +350,9 @@ sendSMPCommand c pKey qId cmd = sendProtocolCommand c pKey qId (Cmd sParty cmd) -- | Send Protocol command sendProtocolCommand :: forall msg. ProtocolEncoding (ProtocolCommand msg) => ProtocolClient msg -> Maybe C.APrivateSignKey -> QueueId -> ProtocolCommand msg -> ExceptT ProtocolClientError IO msg -sendProtocolCommand ProtocolClient {sndQ, sentCommands, clientCorrId, sessionId, tcpTimeout} pKey qId cmd = do +sendProtocolCommand ProtocolClient {sndQ, sentCommands, clientCorrId, sessionId, thVersion, tcpTimeout} pKey qId cmd = do corrId <- lift_ getNextCorrId - t <- signTransmission $ encodeTransmission sessionId (corrId, qId, cmd) + t <- signTransmission $ encodeTransmission thVersion sessionId (corrId, qId, cmd) ExceptT $ sendRecv corrId t where lift_ :: STM a -> ExceptT ProtocolClientError IO a diff --git a/src/Simplex/Messaging/Encoding.hs b/src/Simplex/Messaging/Encoding.hs index 6c7c56ddd..695065419 100644 --- a/src/Simplex/Messaging/Encoding.hs +++ b/src/Simplex/Messaging/Encoding.hs @@ -47,66 +47,100 @@ class Encoding a where instance Encoding Char where smpEncode = B.singleton + {-# INLINE smpEncode #-} smpP = A.anyChar + {-# INLINE smpP #-} + +instance Encoding Bool where + smpEncode = \case + True -> "T" + False -> "F" + {-# INLINE smpEncode #-} + smpP = + smpP >>= \case + 'T' -> pure True + 'F' -> pure False + _ -> fail "invalid Bool" + {-# INLINE smpP #-} instance Encoding Word16 where smpEncode = encodeWord16 + {-# INLINE smpEncode #-} smpP = decodeWord16 <$> A.take 2 + {-# INLINE smpP #-} instance Encoding Word32 where smpEncode = encodeWord32 + {-# INLINE smpEncode #-} smpP = decodeWord32 <$> A.take 4 + {-# INLINE smpP #-} instance Encoding Int64 where smpEncode i = w32 (i `shiftR` 32) <> w32 i + {-# INLINE smpEncode #-} smpP = do l <- w32P r <- w32P pure $ (l `shiftL` 32) .|. r + {-# INLINE smpP #-} w32 :: Int64 -> ByteString w32 = smpEncode @Word32 . fromIntegral +{-# INLINE w32 #-} w32P :: Parser Int64 w32P = fromIntegral <$> smpP @Word32 +{-# INLINE w32P #-} -- ByteStrings are assumed no longer than 255 bytes instance Encoding ByteString where smpEncode s = B.cons (lenEncode $ B.length s) s + {-# INLINE smpEncode #-} smpP = A.take =<< lenP + {-# INLINE smpP #-} lenEncode :: Int -> Char lenEncode = w2c . fromIntegral +{-# INLINE lenEncode #-} lenP :: Parser Int lenP = fromIntegral . c2w <$> A.anyChar +{-# INLINE lenP #-} instance Encoding a => Encoding (Maybe a) where smpEncode s = maybe "0" (("1" <>) . smpEncode) s + {-# INLINE smpEncode #-} smpP = smpP >>= \case '0' -> pure Nothing '1' -> Just <$> smpP _ -> fail "invalid Maybe tag" + {-# INLINE smpP #-} newtype Tail = Tail {unTail :: ByteString} instance Encoding Tail where smpEncode = unTail + {-# INLINE smpEncode #-} smpP = Tail <$> A.takeByteString + {-# INLINE smpP #-} -- newtype for encoding/decoding ByteStrings over 255 bytes with 2-bytes length prefix newtype Large = Large {unLarge :: ByteString} instance Encoding Large where smpEncode (Large s) = smpEncode @Word16 (fromIntegral $ B.length s) <> s + {-# INLINE smpEncode #-} smpP = do len <- fromIntegral <$> smpP @Word16 Large <$> A.take len + {-# INLINE smpP #-} instance Encoding SystemTime where smpEncode = smpEncode . systemSeconds + {-# INLINE smpEncode #-} smpP = MkSystemTime <$> smpP <*> pure 0 + {-# INLINE smpP #-} -- lists encode/parse as a sequence of items prefixed with list length (as 1 byte) smpEncodeList :: Encoding a => [a] -> ByteString @@ -117,7 +151,9 @@ smpListP = (`A.count` smpP) =<< lenP instance Encoding String where smpEncode = smpEncode . B.pack + {-# INLINE smpEncode #-} smpP = B.unpack <$> smpP + {-# INLINE smpP #-} instance Encoding a => Encoding (L.NonEmpty a) where smpEncode = smpEncodeList . L.toList @@ -128,20 +164,36 @@ instance Encoding a => Encoding (L.NonEmpty a) where instance (Encoding a, Encoding b) => Encoding (a, b) where smpEncode (a, b) = smpEncode a <> smpEncode b + {-# INLINE smpEncode #-} smpP = (,) <$> smpP <*> smpP + {-# INLINE smpP #-} instance (Encoding a, Encoding b, Encoding c) => Encoding (a, b, c) where smpEncode (a, b, c) = smpEncode a <> smpEncode b <> smpEncode c + {-# INLINE smpEncode #-} smpP = (,,) <$> smpP <*> smpP <*> smpP + {-# INLINE smpP #-} instance (Encoding a, Encoding b, Encoding c, Encoding d) => Encoding (a, b, c, d) where smpEncode (a, b, c, d) = smpEncode a <> smpEncode b <> smpEncode c <> smpEncode d + {-# INLINE smpEncode #-} smpP = (,,,) <$> smpP <*> smpP <*> smpP <*> smpP + {-# INLINE smpP #-} instance (Encoding a, Encoding b, Encoding c, Encoding d, Encoding e) => Encoding (a, b, c, d, e) where smpEncode (a, b, c, d, e) = smpEncode a <> smpEncode b <> smpEncode c <> smpEncode d <> smpEncode e + {-# INLINE smpEncode #-} smpP = (,,,,) <$> smpP <*> smpP <*> smpP <*> smpP <*> smpP + {-# INLINE smpP #-} instance (Encoding a, Encoding b, Encoding c, Encoding d, Encoding e, Encoding f) => Encoding (a, b, c, d, e, f) where smpEncode (a, b, c, d, e, f) = smpEncode a <> smpEncode b <> smpEncode c <> smpEncode d <> smpEncode e <> smpEncode f + {-# INLINE smpEncode #-} smpP = (,,,,,) <$> smpP <*> smpP <*> smpP <*> smpP <*> smpP <*> smpP + {-# INLINE smpP #-} + +instance (Encoding a, Encoding b, Encoding c, Encoding d, Encoding e, Encoding f, Encoding g) => Encoding (a, b, c, d, e, f, g) where + smpEncode (a, b, c, d, e, f, g) = smpEncode a <> smpEncode b <> smpEncode c <> smpEncode d <> smpEncode e <> smpEncode f <> smpEncode g + {-# INLINE smpEncode #-} + smpP = (,,,,,,) <$> smpP <*> smpP <*> smpP <*> smpP <*> smpP <*> smpP <*> smpP + {-# INLINE smpP #-} diff --git a/src/Simplex/Messaging/Encoding/String.hs b/src/Simplex/Messaging/Encoding/String.hs index 52b26ef1d..d61d1a558 100644 --- a/src/Simplex/Messaging/Encoding/String.hs +++ b/src/Simplex/Messaging/Encoding/String.hs @@ -30,6 +30,7 @@ import qualified Data.List.NonEmpty as L import Data.Text (Text) import Data.Text.Encoding (decodeLatin1, encodeUtf8) import Data.Word (Word16) +import Simplex.Messaging.Encoding import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Util ((<$?>)) @@ -76,11 +77,27 @@ instance FromJSON Str where instance StrEncoding a => StrEncoding (Maybe a) where strEncode = maybe "" strEncode + {-# INLINE strEncode #-} strP = optional strP + {-# INLINE strP #-} instance StrEncoding Word16 where strEncode = B.pack . show + {-# INLINE strEncode #-} strP = A.decimal + {-# INLINE strP #-} + +instance StrEncoding Char where + strEncode = smpEncode + {-# INLINE strEncode #-} + strP = strP + {-# INLINE strP #-} + +instance StrEncoding Bool where + strEncode = smpEncode + {-# INLINE strEncode #-} + strP = smpP + {-# INLINE strP #-} -- lists encode/parse as comma-separated strings strEncodeList :: StrEncoding a => [a] -> ByteString @@ -99,19 +116,27 @@ listItem = parseAll strP <$?> A.takeTill (== ',') instance (StrEncoding a, StrEncoding b) => StrEncoding (a, b) where strEncode (a, b) = B.unwords [strEncode a, strEncode b] + {-# INLINE strEncode #-} strP = (,) <$> strP_ <*> strP + {-# INLINE strP #-} instance (StrEncoding a, StrEncoding b, StrEncoding c) => StrEncoding (a, b, c) where strEncode (a, b, c) = B.unwords [strEncode a, strEncode b, strEncode c] + {-# INLINE strEncode #-} strP = (,,) <$> strP_ <*> strP_ <*> strP + {-# INLINE strP #-} instance (StrEncoding a, StrEncoding b, StrEncoding c, StrEncoding d) => StrEncoding (a, b, c, d) where strEncode (a, b, c, d) = B.unwords [strEncode a, strEncode b, strEncode c, strEncode d] + {-# INLINE strEncode #-} strP = (,,,) <$> strP_ <*> strP_ <*> strP_ <*> strP + {-# INLINE strP #-} instance (StrEncoding a, StrEncoding b, StrEncoding c, StrEncoding d, StrEncoding e) => StrEncoding (a, b, c, d, e) where strEncode (a, b, c, d, e) = B.unwords [strEncode a, strEncode b, strEncode c, strEncode d, strEncode e] + {-# INLINE strEncode #-} strP = (,,,,) <$> strP_ <*> strP_ <*> strP_ <*> strP_ <*> strP + {-# INLINE strP #-} strP_ :: StrEncoding a => Parser a strP_ = strP <* A.space diff --git a/src/Simplex/Messaging/Notifications/Protocol.hs b/src/Simplex/Messaging/Notifications/Protocol.hs index 4ff643136..f997c3c44 100644 --- a/src/Simplex/Messaging/Notifications/Protocol.hs +++ b/src/Simplex/Messaging/Notifications/Protocol.hs @@ -176,7 +176,7 @@ deriving instance Show NtfCmd instance NtfEntityI e => ProtocolEncoding (NtfCommand e) where type Tag (NtfCommand e) = NtfCommandTag e - encodeProtocol = \case + encodeProtocol _v = \case TNEW newTkn -> e (TNEW_, ' ', newTkn) TVFY code -> e (TVFY_, ' ', code) TCHK -> e TCHK_ @@ -190,7 +190,7 @@ instance NtfEntityI e => ProtocolEncoding (NtfCommand e) where e :: Encoding a => a -> ByteString e = smpEncode - protocolP tag = (\(NtfCmd _ c) -> checkEntity c) <$?> protocolP (NCT (sNtfEntity @e) tag) + protocolP _v tag = (\(NtfCmd _ c) -> checkEntity c) <$?> protocolP _v (NCT (sNtfEntity @e) tag) checkCredentials (sig, _, entityId, _) cmd = case cmd of -- TNEW and SNEW must have signature but NOT token/subscription IDs @@ -211,9 +211,9 @@ instance NtfEntityI e => ProtocolEncoding (NtfCommand e) where instance ProtocolEncoding NtfCmd where type Tag NtfCmd = NtfCmdTag - encodeProtocol (NtfCmd _ c) = encodeProtocol c + encodeProtocol _v (NtfCmd _ c) = encodeProtocol _v c - protocolP = \case + protocolP _v = \case NCT SToken tag -> NtfCmd SToken <$> case tag of TNEW_ -> TNEW <$> _smpP @@ -274,7 +274,7 @@ data NtfResponse instance ProtocolEncoding NtfResponse where type Tag NtfResponse = NtfResponseTag - encodeProtocol = \case + encodeProtocol _v = \case NRTknId entId dhKey -> e (NRTknId_, ' ', entId, dhKey) NRSubId entId -> e (NRSubId_, ' ', entId) NROk -> e NROk_ @@ -286,7 +286,7 @@ instance ProtocolEncoding NtfResponse where e :: Encoding a => a -> ByteString e = smpEncode - protocolP = \case + protocolP _v = \case NRTknId_ -> NRTknId <$> _smpP <*> smpP NRSubId_ -> NRSubId <$> _smpP NROk_ -> pure NROk diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 125a8e5d2..e1011b290 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -189,9 +189,9 @@ receive th NtfServerClient {rcvQ, sndQ, activeAt} = forever $ do write q t = atomically $ writeTBQueue q t send :: (Transport c, MonadUnliftIO m) => THandle c -> NtfServerClient -> m () -send h NtfServerClient {sndQ, sessionId, activeAt} = forever $ do +send h@THandle {thVersion = v} NtfServerClient {sndQ, sessionId, activeAt} = forever $ do t <- atomically $ readTBQueue sndQ - void . liftIO $ tPut h (Nothing, encodeTransmission sessionId t) + void . liftIO $ tPut h (Nothing, encodeTransmission v sessionId t) atomically . writeTVar activeAt =<< liftIO getSystemTime data VerificationResult = VRVerified NtfRequest | VRFailed diff --git a/src/Simplex/Messaging/Notifications/Server/Push/testpush.sh b/src/Simplex/Messaging/Notifications/Server/Push/testpush.sh index b41135d83..bac5d8684 100755 --- a/src/Simplex/Messaging/Notifications/Server/Push/testpush.sh +++ b/src/Simplex/Messaging/Notifications/Server/Push/testpush.sh @@ -22,4 +22,5 @@ export AUTHENTICATION_TOKEN="${JWT_HEADER}.${JWT_CLAIMS}.${JWT_SIGNED_HEADER_CLA # curl -v --header "apns-topic: $TOPIC" --header "apns-push-type: background" --header "apns-priority: 5" --header "authorization: bearer $AUTHENTICATION_TOKEN" --data '{"aps":{"content-available":1}}' --http2 https://${APNS_HOST_NAME}/3/device/${DEVICE_TOKEN} # mutable-content notification -curl -v --header "apns-topic: $TOPIC" --header "apns-push-type: alert" --header "authorization: bearer $AUTHENTICATION_TOKEN" --data '{"aps":{"category": "NTF_CAT_CHECK_MESSAGE__SECRET", "mutable-content": 1, "alert":"received encrypted message"}, "data": {"test":"123"}}' --http2 https://${APNS_HOST_NAME}/3/device/${DEVICE_TOKEN} \ No newline at end of file +# NTF_CAT_CHECK_MESSAGE category will not show alert if the app is in foreground +curl -v --header "apns-topic: $TOPIC" --header "apns-push-type: alert" --header "authorization: bearer $AUTHENTICATION_TOKEN" --data '{"aps":{"category": "NTF_CAT_CHECK_MESSAGE__SECRET", "mutable-content": 1, "alert":"received encrypted message"}, "data": {"test":"123"}}' --http2 https://${APNS_HOST_NAME}/3/device/${DEVICE_TOKEN} diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index dde5992b3..2503bb9f2 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -78,6 +78,8 @@ module Simplex.Messaging.Protocol NtfPublicVerifyKey, MsgId, MsgBody, + MsgFlags (..), + noMsgFlags, -- * Parse and serialize ProtocolMsgTag (..), @@ -214,11 +216,14 @@ data Command (p :: Party) where SUB :: Command Recipient KEY :: SndPublicVerifyKey -> Command Recipient NKEY :: NtfPublicVerifyKey -> Command Recipient + GET :: Command Recipient ACK :: Command Recipient OFF :: Command Recipient DEL :: Command Recipient -- SMP sender commands - SEND :: MsgBody -> Command Sender + -- SEND v1 has to be supported for encoding/decoding + -- SEND :: MsgBody -> Command Sender + SEND :: MsgFlags -> MsgBody -> Command Sender PING :: Command Sender -- SMP notification subscriber commands NSUB :: Command Notifier @@ -230,7 +235,9 @@ deriving instance Eq (Command p) data BrokerMsg where -- SMP broker messages (responses, client messages, notifications) IDS :: QueueIdsKeys -> BrokerMsg - MSG :: MsgId -> SystemTime -> MsgBody -> BrokerMsg + -- MSG v1 has to be supported for encoding/decoding + -- MSG :: MsgId -> SystemTime -> MsgBody -> BrokerMsg + MSG :: MsgId -> SystemTime -> MsgFlags -> MsgBody -> BrokerMsg NID :: NotifierId -> BrokerMsg NMSG :: BrokerMsg END :: BrokerMsg @@ -239,6 +246,18 @@ data BrokerMsg where PONG :: BrokerMsg deriving (Eq, Show) +newtype MsgFlags = MsgFlags {notification :: Bool} + deriving (Eq, Show) + +instance Encoding MsgFlags where + smpEncode MsgFlags {notification} = smpEncode notification + smpP = do + notification <- smpP <* A.takeTill (== ' ') + pure MsgFlags {notification} + +noMsgFlags :: MsgFlags +noMsgFlags = MsgFlags {notification = False} + -- * SMP command tags data CommandTag (p :: Party) where @@ -246,6 +265,7 @@ data CommandTag (p :: Party) where SUB_ :: CommandTag Recipient KEY_ :: CommandTag Recipient NKEY_ :: CommandTag Recipient + GET_ :: CommandTag Recipient ACK_ :: CommandTag Recipient OFF_ :: CommandTag Recipient DEL_ :: CommandTag Recipient @@ -284,6 +304,7 @@ instance PartyI p => Encoding (CommandTag p) where SUB_ -> "SUB" KEY_ -> "KEY" NKEY_ -> "NKEY" + GET_ -> "GET" ACK_ -> "ACK" OFF_ -> "OFF" DEL_ -> "DEL" @@ -298,6 +319,7 @@ instance ProtocolMsgTag CmdTag where "SUB" -> Just $ CT SRecipient SUB_ "KEY" -> Just $ CT SRecipient KEY_ "NKEY" -> Just $ CT SRecipient NKEY_ + "GET" -> Just $ CT SRecipient GET_ "ACK" -> Just $ CT SRecipient ACK_ "OFF" -> Just $ CT SRecipient OFF_ "DEL" -> Just $ CT SRecipient DEL_ @@ -526,6 +548,8 @@ data CommandError UNKNOWN | -- | error parsing command SYNTAX + | -- | command is not allowed (SUB/GET cannot be used with the same queue in the same TCP connection) + PROHIBITED | -- | transmission has no required credentials (signature or queue ID) NO_AUTH | -- | transmission has credentials that are not allowed for this command @@ -572,28 +596,31 @@ instance Protocol BrokerMsg where class ProtocolMsgTag (Tag msg) => ProtocolEncoding msg where type Tag msg - encodeProtocol :: msg -> ByteString - protocolP :: Tag msg -> Parser msg + encodeProtocol :: Version -> msg -> ByteString + protocolP :: Version -> Tag msg -> Parser msg checkCredentials :: SignedRawTransmission -> msg -> Either ErrorType msg instance PartyI p => ProtocolEncoding (Command p) where type Tag (Command p) = CommandTag p - encodeProtocol = \case + encodeProtocol v = \case NEW rKey dhKey -> e (NEW_, ' ', rKey, dhKey) SUB -> e SUB_ KEY k -> e (KEY_, ' ', k) NKEY k -> e (NKEY_, ' ', k) + GET -> e GET_ ACK -> e ACK_ OFF -> e OFF_ DEL -> e DEL_ - SEND msg -> e (SEND_, ' ', Tail msg) + SEND flags msg + | v == 1 -> e (SEND_, ' ', Tail msg) + | otherwise -> e (SEND_, ' ', flags, ' ', Tail msg) PING -> e PING_ NSUB -> e NSUB_ where e :: Encoding a => a -> ByteString e = smpEncode - protocolP tag = (\(Cmd _ c) -> checkParty c) <$?> protocolP (CT (sParty @p) tag) + protocolP v tag = (\(Cmd _ c) -> checkParty c) <$?> protocolP v (CT (sParty @p) tag) checkCredentials (sig, _, queueId, _) cmd = case cmd of -- NEW must have signature but NOT queue ID @@ -602,7 +629,7 @@ instance PartyI p => ProtocolEncoding (Command p) where | not (B.null queueId) -> Left $ CMD HAS_AUTH | otherwise -> Right cmd -- SEND must have queue ID, signature is not always required - SEND _ + SEND {} | B.null queueId -> Left $ CMD NO_ENTITY | otherwise -> Right cmd -- PING must not have queue ID or signature @@ -616,21 +643,24 @@ instance PartyI p => ProtocolEncoding (Command p) where instance ProtocolEncoding Cmd where type Tag Cmd = CmdTag - encodeProtocol (Cmd _ c) = encodeProtocol c + encodeProtocol v (Cmd _ c) = encodeProtocol v c - protocolP = \case + protocolP v = \case CT SRecipient tag -> Cmd SRecipient <$> case tag of NEW_ -> NEW <$> _smpP <*> smpP SUB_ -> pure SUB KEY_ -> KEY <$> _smpP NKEY_ -> NKEY <$> _smpP + GET_ -> pure GET ACK_ -> pure ACK OFF_ -> pure OFF DEL_ -> pure DEL CT SSender tag -> Cmd SSender <$> case tag of - SEND_ -> SEND . unTail <$> _smpP + SEND_ + | v == 1 -> SEND <$> pure noMsgFlags <*> (unTail <$> _smpP) + | otherwise -> SEND <$> _smpP <*> (unTail <$> _smpP) PING_ -> pure PING CT SNotifier NSUB_ -> pure $ Cmd SNotifier NSUB @@ -638,9 +668,11 @@ instance ProtocolEncoding Cmd where instance ProtocolEncoding BrokerMsg where type Tag BrokerMsg = BrokerMsgTag - encodeProtocol = \case + encodeProtocol v = \case IDS (QIK rcvId sndId srvDh) -> e (IDS_, ' ', rcvId, sndId, srvDh) - MSG msgId ts msgBody -> e (MSG_, ' ', msgId, ts, Tail msgBody) + MSG msgId ts flags msgBody + | v == 1 -> e (MSG_, ' ', msgId, ts, Tail msgBody) + | otherwise -> e (MSG_, ' ', msgId, ts, flags, ' ', Tail msgBody) NID nId -> e (NID_, ' ', nId) NMSG -> e NMSG_ END -> e END_ @@ -651,8 +683,10 @@ instance ProtocolEncoding BrokerMsg where e :: Encoding a => a -> ByteString e = smpEncode - protocolP = \case - MSG_ -> MSG <$> _smpP <*> smpP <*> (unTail <$> smpP) + protocolP v = \case + MSG_ + | v == 1 -> MSG <$> _smpP <*> smpP <*> pure noMsgFlags <*> (unTail <$> smpP) + | otherwise -> MSG <$> _smpP <*> smpP <*> smpP <*> (unTail <$> _smpP) IDS_ -> IDS <$> (QIK <$> _smpP <*> smpP <*> smpP) NID_ -> NID <$> _smpP NMSG_ -> pure NMSG @@ -679,11 +713,11 @@ _smpP :: Encoding a => Parser a _smpP = A.space *> smpP -- | Parse SMP protocol commands and broker messages -parseProtocol :: ProtocolEncoding msg => ByteString -> Either ErrorType msg -parseProtocol s = +parseProtocol :: ProtocolEncoding msg => Version -> ByteString -> Either ErrorType msg +parseProtocol v s = let (tag, params) = B.break (== ' ') s in case decodeTag tag of - Just cmd -> parse (protocolP cmd) (CMD SYNTAX) params + Just cmd -> parse (protocolP v cmd) (CMD SYNTAX) params Nothing -> Left $ CMD UNKNOWN checkParty :: forall t p p'. (PartyI p, PartyI p') => t p' -> Either String (t p) @@ -725,6 +759,7 @@ instance Encoding CommandError where smpEncode e = case e of UNKNOWN -> "UNKNOWN" SYNTAX -> "SYNTAX" + PROHIBITED -> "PROHIBITED" NO_AUTH -> "NO_AUTH" HAS_AUTH -> "HAS_AUTH" NO_ENTITY -> "NO_ENTITY" @@ -732,6 +767,7 @@ instance Encoding CommandError where A.takeTill (== ' ') >>= \case "UNKNOWN" -> pure UNKNOWN "SYNTAX" -> pure SYNTAX + "PROHIBITED" -> pure PROHIBITED "NO_AUTH" -> pure NO_AUTH "HAS_AUTH" -> pure HAS_AUTH "NO_ENTITY" -> pure NO_ENTITY @@ -742,9 +778,9 @@ instance Encoding CommandError where tPut :: Transport c => THandle c -> SentRawTransmission -> IO (Either TransportError ()) tPut th (sig, t) = tPutBlock th $ smpEncode (C.signatureBytes sig) <> t -encodeTransmission :: ProtocolEncoding c => ByteString -> Transmission c -> ByteString -encodeTransmission sessionId (CorrId corrId, queueId, command) = - smpEncode (sessionId, corrId, queueId) <> encodeProtocol command +encodeTransmission :: ProtocolEncoding c => Version -> ByteString -> Transmission c -> ByteString +encodeTransmission v sessionId (CorrId corrId, queueId, command) = + smpEncode (sessionId, corrId, queueId) <> encodeProtocol v command -- | Receive and parse transmission from the TCP transport (ignoring any trailing padding). tGetParse :: Transport c => THandle c -> IO (Either TransportError RawTransmission) @@ -752,7 +788,7 @@ tGetParse th = (parse transmissionP TEBadBlock =<<) <$> tGetBlock th -- | Receive client and server transmissions (determined by `cmd` type). tGet :: forall cmd c m. (ProtocolEncoding cmd, Transport c, MonadIO m) => THandle c -> m (SignedTransmission cmd) -tGet th@THandle {sessionId} = liftIO (tGetParse th) >>= decodeParseValidate +tGet th@THandle {sessionId, thVersion = v} = liftIO (tGetParse th) >>= decodeParseValidate where decodeParseValidate :: Either TransportError RawTransmission -> m (SignedTransmission cmd) decodeParseValidate = \case @@ -768,5 +804,5 @@ tGet th@THandle {sessionId} = liftIO (tGetParse th) >>= decodeParseValidate tParseValidate :: ByteString -> SignedRawTransmission -> m (SignedTransmission cmd) tParseValidate signed t@(sig, corrId, entityId, command) = do - let cmd = parseProtocol command >>= checkCredentials t + let cmd = parseProtocol v command >>= checkCredentials t pure (sig, signed, (CorrId corrId, entityId, cmd)) diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index aac1db5ee..bda7261ad 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -227,10 +227,10 @@ receive th Client {rcvQ, sndQ, activeAt} = forever $ do write q t = atomically $ writeTBQueue q t send :: (Transport c, MonadUnliftIO m) => THandle c -> Client -> m () -send h Client {sndQ, sessionId, activeAt} = forever $ do +send h@THandle {thVersion = v} Client {sndQ, sessionId, activeAt} = forever $ do t <- atomically $ readTBQueue sndQ -- TODO the line below can return Left, but we ignore it and do not disconnect the client - void . liftIO $ tPut h (Nothing, encodeTransmission sessionId t) + void . liftIO $ tPut h (Nothing, encodeTransmission v sessionId t) atomically . writeTVar activeAt =<< liftIO getSystemTime disconnectTransport :: (Transport c, MonadUnliftIO m) => THandle c -> client -> (client -> TVar SystemTime) -> ExpirationConfig -> m () @@ -248,7 +248,7 @@ verifyTransmission sig_ signed queueId cmd = do case cmd of Cmd SRecipient (NEW k _) -> pure $ verifyCmdSignature sig_ signed k Cmd SRecipient _ -> verifyCmd SRecipient $ verifyCmdSignature sig_ signed . recipientKey - Cmd SSender (SEND _) -> verifyCmd SSender $ verifyMaybe . senderKey + Cmd SSender SEND {} -> verifyCmd SSender $ verifyMaybe . senderKey Cmd SSender PING -> pure True Cmd SNotifier NSUB -> verifyCmd SNotifier $ verifyMaybe . fmap snd . notifier where @@ -298,7 +298,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri case cmd of Cmd SSender command -> case command of - SEND msgBody -> sendMessage st msgBody + SEND flags msgBody -> sendMessage st flags msgBody PING -> pure (corrId, "", PONG) Cmd SNotifier NSUB -> subscribeNotifications Cmd SRecipient command -> @@ -309,6 +309,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri (createQueue st rKey dhKey) (pure (corrId, queueId, ERR AUTH)) SUB -> subscribeQueue queueId + GET -> getMessage ACK -> acknowledgeMsg KEY sKey -> secureQueue_ st sKey NKEY nKey -> addQueueNotifier_ st nKey @@ -386,17 +387,42 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri subscribeQueue :: RecipientId -> m (Transmission BrokerMsg) subscribeQueue rId = - atomically (getSubscription rId) >>= deliverMessage tryPeekMsg rId + atomically (getSubscription rId) >>= \case + Just s -> deliverMessage tryPeekMsg rId s + -- cannot use SUB in the same connection where GET was used + _ -> pure (corrId, rId, ERR $ CMD PROHIBITED) - getSubscription :: RecipientId -> STM Sub + getSubscription :: RecipientId -> STM (Maybe Sub) getSubscription rId = do TM.lookup rId subscriptions >>= \case - Just s -> tryTakeTMVar (delivered s) $> s + Just Sub {subThread = ProhibitSub} -> pure Nothing + Just s -> tryTakeTMVar (delivered s) $> Just s Nothing -> do writeTBQueue subscribedQ (rId, clnt) s <- newSubscription TM.insert rId s subscriptions - return s + pure $ Just s + + getMessage :: m (Transmission BrokerMsg) + getMessage = + atomically getProhibitedSub >>= \case + Just s -> do + q <- getStoreMsgQueue queueId + atomically $ + tryPeekMsg q >>= \case + Just msg -> tryPutTMVar (delivered s) () $> (corrId, queueId, msgCmd msg) + _ -> pure (corrId, queueId, ERR NO_MSG) + _ -> pure (corrId, queueId, ERR $ CMD PROHIBITED) -- cannot use GET in the same connection where there is an active subscription + where + getProhibitedSub :: STM (Maybe Sub) + getProhibitedSub = + TM.lookup queueId subscriptions >>= \case + Just s@Sub {subThread = ProhibitSub} -> tryTakeTMVar (delivered s) $> Just s + Just _ -> pure Nothing + Nothing -> do + s <- prohibitedSubscription + TM.insert queueId s subscriptions + pure $ Just s subscribeNotifications :: m (Transmission BrokerMsg) subscribeNotifications = atomically $ do @@ -413,14 +439,18 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri stats <- asks serverStats atomically $ modifyTVar (msgRecv stats) (+ 1) atomically $ modifyTVar (msgQueues stats) (S.insert queueId) - deliverMessage tryDelPeekMsg queueId s + case s of + Sub {subThread = ProhibitSub} -> + (getStoreMsgQueue queueId >>= atomically . tryDelMsg) $> ok + _ -> + deliverMessage tryDelPeekMsg queueId s _ -> return $ err NO_MSG withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a) withSub rId f = mapM f =<< TM.lookup rId subscriptions - sendMessage :: QueueStore -> MsgBody -> m (Transmission BrokerMsg) - sendMessage st msgBody + sendMessage :: QueueStore -> MsgFlags -> MsgBody -> m (Transmission BrokerMsg) + sendMessage st flags msgBody | B.length msgBody > maxMessageLength = pure $ err LARGE_MSG | otherwise = do qr <- atomically $ getQueue st SSender queueId @@ -454,7 +484,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri msgId <- randomId =<< asks (msgIdBytes . config) ts <- liftIO getSystemTime let c = C.cbEncrypt (rcvDhSecret qr) (C.cbNonce msgId) msgBody (maxMessageLength + 2) - pure $ Message msgId ts <$> c + pure $ Message msgId ts flags <$> c trySendNotification :: STM () trySendNotification = @@ -469,9 +499,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri deliverMessage :: (MsgQueue -> STM (Maybe Message)) -> RecipientId -> Sub -> m (Transmission BrokerMsg) deliverMessage tryPeek rId = \case Sub {subThread = NoSub} -> do - ms <- asks msgStore - quota <- asks $ msgQueueQuota . config - q <- atomically $ getMsgQueue ms rId quota + q <- getStoreMsgQueue rId atomically (tryPeek q) >>= \case Nothing -> forkSub q $> ok Just msg -> atomically setDelivered $> (corrId, rId, msgCmd msg) @@ -498,8 +526,14 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscri setDelivered :: STM (Maybe Bool) setDelivered = withSub rId $ \s -> tryPutTMVar (delivered s) () - msgCmd :: Message -> BrokerMsg - msgCmd Message {msgId, ts, msgBody} = MSG msgId ts msgBody + getStoreMsgQueue :: RecipientId -> m MsgQueue + getStoreMsgQueue rId = do + ms <- asks msgStore + quota <- asks $ msgQueueQuota . config + atomically $ getMsgQueue ms rId quota + + msgCmd :: Message -> BrokerMsg + msgCmd Message {msgId, ts, msgFlags, msgBody} = MSG msgId ts msgFlags msgBody delQueueAndMsgs :: QueueStore -> m (Transmission BrokerMsg) delQueueAndMsgs st = do diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index d70fe203f..3bec11ee4 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -113,7 +113,7 @@ data ServerStats = ServerStats fromTime :: TVar UTCTime } -data SubscriptionThread = NoSub | SubPending | SubThread ThreadId +data SubscriptionThread = NoSub | SubPending | SubThread ThreadId | ProhibitSub data Sub = Sub { subThread :: SubscriptionThread, @@ -150,9 +150,15 @@ newServerStats ts = do pure ServerStats {qCreated, qSecured, qDeleted, msgSent, msgRecv, msgQueues, fromTime} newSubscription :: STM Sub -newSubscription = do +newSubscription = newSubscription_ NoSub + +prohibitedSubscription :: STM Sub +prohibitedSubscription = newSubscription_ ProhibitSub + +newSubscription_ :: SubscriptionThread -> STM Sub +newSubscription_ subThread = do delivered <- newEmptyTMVar - return Sub {subThread = NoSub, delivered} + return Sub {subThread, delivered} newEnv :: forall m. (MonadUnliftIO m, MonadRandom m) => ServerConfig -> m Env newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, storeLogFile} = do diff --git a/src/Simplex/Messaging/Server/MsgStore.hs b/src/Simplex/Messaging/Server/MsgStore.hs index a7180311f..c4f36acb5 100644 --- a/src/Simplex/Messaging/Server/MsgStore.hs +++ b/src/Simplex/Messaging/Server/MsgStore.hs @@ -5,11 +5,12 @@ module Simplex.Messaging.Server.MsgStore where import Data.Int (Int64) import Data.Time.Clock.System (SystemTime) import Numeric.Natural -import Simplex.Messaging.Protocol (MsgBody, MsgId, RecipientId) +import Simplex.Messaging.Protocol (MsgBody, MsgFlags, MsgId, RecipientId) data Message = Message { msgId :: MsgId, ts :: SystemTime, + msgFlags :: MsgFlags, msgBody :: MsgBody } @@ -22,5 +23,6 @@ class MonadMsgQueue q m where writeMsg :: q -> Message -> m () -- non blocking tryPeekMsg :: q -> m (Maybe Message) -- non blocking peekMsg :: q -> m Message -- blocking + tryDelMsg :: q -> m () -- non blocking tryDelPeekMsg :: q -> m (Maybe Message) -- atomic delete (== read) last and peek next message, if available deleteExpiredMsgs :: q -> Int64 -> m () diff --git a/src/Simplex/Messaging/Server/MsgStore/STM.hs b/src/Simplex/Messaging/Server/MsgStore/STM.hs index be395f918..06048b8e1 100644 --- a/src/Simplex/Messaging/Server/MsgStore/STM.hs +++ b/src/Simplex/Messaging/Server/MsgStore/STM.hs @@ -7,7 +7,7 @@ module Simplex.Messaging.Server.MsgStore.STM where -import Control.Monad (when) +import Control.Monad (void, when) import Data.Int (Int64) import Data.Time.Clock.System (SystemTime (systemSeconds)) import Numeric.Natural @@ -49,6 +49,9 @@ instance MonadMsgQueue MsgQueue STM where peekMsg :: MsgQueue -> STM Message peekMsg = peekTBQueue . msgQueue + tryDelMsg :: MsgQueue -> STM () + tryDelMsg = void . tryReadTBQueue . msgQueue + -- atomic delete (== read) last and peek next message if available tryDelPeekMsg :: MsgQueue -> STM (Maybe Message) tryDelPeekMsg (MsgQueue q) = tryReadTBQueue q >> tryPeekTBQueue q diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 9f58e5441..ac7f34c9a 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -93,7 +93,7 @@ smpBlockSize :: Int smpBlockSize = 16384 supportedSMPVersions :: VersionRange -supportedSMPVersions = mkVersionRange 1 1 +supportedSMPVersions = mkVersionRange 1 2 simplexMQVersion :: String simplexMQVersion = "2.1.1" diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index 24291773b..8940f239b 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -78,3 +78,7 @@ catchAll = E.catch catchAll_ :: IO a -> IO a -> IO a catchAll_ a = catchAll a . const {-# INLINE catchAll_ #-} + +eitherToMaybe :: Either a b -> Maybe b +eitherToMaybe = either (const Nothing) Just +{-# INLINE eitherToMaybe #-} diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index 36cfc30fb..318b9e9de 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -117,7 +117,7 @@ h #:# err = tryGet `shouldReturn` () _ -> return () pattern Msg :: MsgBody -> ACommand 'Agent -pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} msgBody +pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} _ msgBody testDuplexConnection :: Transport c => TProxy c -> c -> c -> IO () testDuplexConnection _ alice bob = do @@ -130,24 +130,24 @@ testDuplexConnection _ alice bob = do bob <# ("", "alice", CON) alice <# ("", "bob", CON) -- message IDs 1 to 3 get assigned to control messages, so first MSG is assigned ID 4 - alice #: ("3", "bob", "SEND :hello") #> ("3", "bob", MID 5) + alice #: ("3", "bob", "SEND F :hello") #> ("3", "bob", MID 5) alice <# ("", "bob", SENT 5) bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False bob #: ("12", "alice", "ACK 5") #> ("12", "alice", OK) - alice #: ("4", "bob", "SEND :how are you?") #> ("4", "bob", MID 6) + alice #: ("4", "bob", "SEND F :how are you?") #> ("4", "bob", MID 6) alice <# ("", "bob", SENT 6) bob <#= \case ("", "alice", Msg "how are you?") -> True; _ -> False bob #: ("13", "alice", "ACK 6") #> ("13", "alice", OK) - bob #: ("14", "alice", "SEND 9\nhello too") #> ("14", "alice", MID 7) + bob #: ("14", "alice", "SEND F 9\nhello too") #> ("14", "alice", MID 7) bob <# ("", "alice", SENT 7) alice <#= \case ("", "bob", Msg "hello too") -> True; _ -> False alice #: ("3a", "bob", "ACK 7") #> ("3a", "bob", OK) - bob #: ("15", "alice", "SEND 9\nmessage 1") #> ("15", "alice", MID 8) + bob #: ("15", "alice", "SEND F 9\nmessage 1") #> ("15", "alice", MID 8) bob <# ("", "alice", SENT 8) alice <#= \case ("", "bob", Msg "message 1") -> True; _ -> False alice #: ("4a", "bob", "ACK 8") #> ("4a", "bob", OK) alice #: ("5", "bob", "OFF") #> ("5", "bob", OK) - bob #: ("17", "alice", "SEND 9\nmessage 3") #> ("17", "alice", MID 9) + bob #: ("17", "alice", "SEND F 9\nmessage 3") #> ("17", "alice", MID 9) bob <# ("", "alice", MERR 9 (SMP AUTH)) alice #: ("6", "bob", "DEL") #> ("6", "bob", OK) alice #:# "nothing else should be delivered to alice" @@ -163,24 +163,24 @@ testDuplexConnRandomIds _ alice bob = do bob <# ("", aliceConn, INFO "alice's connInfo") bob <# ("", aliceConn, CON) alice <# ("", bobConn, CON) - alice #: ("2", bobConn, "SEND :hello") #> ("2", bobConn, MID 5) + alice #: ("2", bobConn, "SEND F :hello") #> ("2", bobConn, MID 5) alice <# ("", bobConn, SENT 5) bob <#= \case ("", c, Msg "hello") -> c == aliceConn; _ -> False bob #: ("12", aliceConn, "ACK 5") #> ("12", aliceConn, OK) - alice #: ("3", bobConn, "SEND :how are you?") #> ("3", bobConn, MID 6) + alice #: ("3", bobConn, "SEND F :how are you?") #> ("3", bobConn, MID 6) alice <# ("", bobConn, SENT 6) bob <#= \case ("", c, Msg "how are you?") -> c == aliceConn; _ -> False bob #: ("13", aliceConn, "ACK 6") #> ("13", aliceConn, OK) - bob #: ("14", aliceConn, "SEND 9\nhello too") #> ("14", aliceConn, MID 7) + bob #: ("14", aliceConn, "SEND F 9\nhello too") #> ("14", aliceConn, MID 7) bob <# ("", aliceConn, SENT 7) alice <#= \case ("", c, Msg "hello too") -> c == bobConn; _ -> False alice #: ("3a", bobConn, "ACK 7") #> ("3a", bobConn, OK) - bob #: ("15", aliceConn, "SEND 9\nmessage 1") #> ("15", aliceConn, MID 8) + bob #: ("15", aliceConn, "SEND F 9\nmessage 1") #> ("15", aliceConn, MID 8) bob <# ("", aliceConn, SENT 8) alice <#= \case ("", c, Msg "message 1") -> c == bobConn; _ -> False alice #: ("4a", bobConn, "ACK 8") #> ("4a", bobConn, OK) alice #: ("5", bobConn, "OFF") #> ("5", bobConn, OK) - bob #: ("17", aliceConn, "SEND 9\nmessage 3") #> ("17", aliceConn, MID 9) + bob #: ("17", aliceConn, "SEND F 9\nmessage 3") #> ("17", aliceConn, MID 9) bob <# ("", aliceConn, MERR 9 (SMP AUTH)) alice #: ("6", bobConn, "DEL") #> ("6", bobConn, OK) alice #:# "nothing else should be delivered to alice" @@ -198,7 +198,7 @@ testContactConnection _ alice bob tom = do alice <# ("", "bob", INFO "bob's connInfo 2") alice <# ("", "bob", CON) bob <# ("", "alice", CON) - alice #: ("3", "bob", "SEND :hi") #> ("3", "bob", MID 5) + alice #: ("3", "bob", "SEND F :hi") #> ("3", "bob", MID 5) alice <# ("", "bob", SENT 5) bob <#= \case ("", "alice", Msg "hi") -> True; _ -> False bob #: ("13", "alice", "ACK 5") #> ("13", "alice", OK) @@ -211,7 +211,7 @@ testContactConnection _ alice bob tom = do alice <# ("", "tom", INFO "tom's connInfo 2") alice <# ("", "tom", CON) tom <# ("", "alice", CON) - alice #: ("5", "tom", "SEND :hi there") #> ("5", "tom", MID 5) + alice #: ("5", "tom", "SEND F :hi there") #> ("5", "tom", MID 5) alice <# ("", "tom", SENT 5) tom <#= \case ("", "alice", Msg "hi there") -> True; _ -> False tom #: ("23", "alice", "ACK 5") #> ("23", "alice", OK) @@ -234,7 +234,7 @@ testContactConnRandomIds _ alice bob = do alice <# ("", bobConn, CON) bob <# ("", aliceConn, CON) - alice #: ("3", bobConn, "SEND :hi") #> ("3", bobConn, MID 5) + alice #: ("3", bobConn, "SEND F :hi") #> ("3", bobConn, MID 5) alice <# ("", bobConn, SENT 5) bob <#= \case ("", c, Msg "hi") -> c == aliceConn; _ -> False bob #: ("13", aliceConn, "ACK 5") #> ("13", aliceConn, OK) @@ -254,17 +254,17 @@ testRejectContactRequest _ alice bob = do testSubscription :: Transport c => TProxy c -> c -> c -> c -> IO () testSubscription _ alice1 alice2 bob = do (alice1, "alice") `connect` (bob, "bob") - bob #: ("12", "alice", "SEND 5\nhello") #> ("12", "alice", MID 5) + bob #: ("12", "alice", "SEND F 5\nhello") #> ("12", "alice", MID 5) bob <# ("", "alice", SENT 5) alice1 <#= \case ("", "bob", Msg "hello") -> True; _ -> False alice1 #: ("1", "bob", "ACK 5") #> ("1", "bob", OK) - bob #: ("13", "alice", "SEND 11\nhello again") #> ("13", "alice", MID 6) + bob #: ("13", "alice", "SEND F 11\nhello again") #> ("13", "alice", MID 6) bob <# ("", "alice", SENT 6) alice1 <#= \case ("", "bob", Msg "hello again") -> True; _ -> False alice1 #: ("2", "bob", "ACK 6") #> ("2", "bob", OK) alice2 #: ("21", "bob", "SUB") #> ("21", "bob", OK) alice1 <# ("", "bob", END) - bob #: ("14", "alice", "SEND 2\nhi") #> ("14", "alice", MID 7) + bob #: ("14", "alice", "SEND F 2\nhi") #> ("14", "alice", MID 7) bob <# ("", "alice", SENT 7) alice2 <#= \case ("", "bob", Msg "hi") -> True; _ -> False alice2 #: ("22", "bob", "ACK 7") #> ("22", "bob", OK) @@ -283,7 +283,7 @@ testMsgDeliveryServerRestart :: Transport c => TProxy c -> c -> c -> IO () testMsgDeliveryServerRestart t alice bob = do withServer $ do connect (alice, "alice") (bob, "bob") - bob #: ("1", "alice", "SEND 2\nhi") #> ("1", "alice", MID 5) + bob #: ("1", "alice", "SEND F 2\nhi") #> ("1", "alice", MID 5) bob <# ("", "alice", SENT 5) alice <#= \case ("", "bob", Msg "hi") -> True; _ -> False alice #: ("11", "bob", "ACK 5") #> ("11", "bob", OK) @@ -291,7 +291,7 @@ testMsgDeliveryServerRestart t alice bob = do let server = (SMPServer "localhost" testPort2 testKeyHash) alice <# ("", "", DOWN server ["bob"]) - bob #: ("2", "alice", "SEND 11\nhello again") #> ("2", "alice", MID 6) + bob #: ("2", "alice", "SEND F 11\nhello again") #> ("2", "alice", MID 6) bob #:# "nothing else delivered before the server is restarted" alice #:# "nothing else delivered before the server is restarted" @@ -314,7 +314,7 @@ testServerConnectionAfterError t _ = do bob <# ("", "", DOWN server ["alice"]) alice <# ("", "", DOWN server ["bob"]) - alice #: ("1", "bob", "SEND 5\nhello") #> ("1", "bob", MID 5) + alice #: ("1", "bob", "SEND F 5\nhello") #> ("1", "bob", MID 5) alice #:# "nothing else delivered before the server is restarted" bob #:# "nothing else delivered before the server is restarted" @@ -328,7 +328,7 @@ testServerConnectionAfterError t _ = do bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False bob #: ("2", "alice", "ACK 5") #> ("2", "alice", OK) alice <# ("", "", UP server ["bob"]) - alice #: ("1", "bob", "SEND 11\nhello again") #> ("1", "bob", MID 6) + alice #: ("1", "bob", "SEND F 11\nhello again") #> ("1", "bob", MID 6) alice <# ("", "bob", SENT 6) bob <#= \case ("", "alice", Msg "hello again") -> True; _ -> False @@ -349,14 +349,14 @@ testMsgDeliveryAgentRestart t bob = do withAgent $ \alice -> do withServer $ do connect (bob, "bob") (alice, "alice") - alice #: ("1", "bob", "SEND 5\nhello") #> ("1", "bob", MID 5) + alice #: ("1", "bob", "SEND F 5\nhello") #> ("1", "bob", MID 5) alice <# ("", "bob", SENT 5) bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False bob #: ("11", "alice", "ACK 5") #> ("11", "alice", OK) bob #:# "nothing else delivered before the server is down" bob <# ("", "", DOWN server ["alice"]) - alice #: ("2", "bob", "SEND 11\nhello again") #> ("2", "bob", MID 6) + alice #: ("2", "bob", "SEND F 11\nhello again") #> ("2", "bob", MID 6) alice #:# "nothing else delivered before the server is restarted" bob #:# "nothing else delivered before the server is restarted" @@ -394,11 +394,11 @@ testConcurrentMsgDelivery _ alice bob = do -- the first connection should not be blocked by the second one sendMessage (alice, "alice") (bob, "bob") "hello" - -- alice #: ("2", "bob", "SEND :hello") #> ("2", "bob", MID 1) + -- alice #: ("2", "bob", "SEND F :hello") #> ("2", "bob", MID 1) -- alice <# ("", "bob", SENT 1) -- bob <#= \case ("", "alice", Msg "hello") -> True; _ -> False -- bob #: ("12", "alice", "ACK 1") #> ("12", "alice", OK) - bob #: ("14", "alice", "SEND 9\nhello too") #> ("14", "alice", MID 6) + bob #: ("14", "alice", "SEND F 9\nhello too") #> ("14", "alice", MID 6) bob <# ("", "alice", SENT 6) -- if delivery is blocked it won't go further alice <#= \case ("", "bob", Msg "hello too") -> True; _ -> False @@ -411,11 +411,11 @@ testMsgDeliveryQuotaExceeded _ alice bob = do forM_ [1 .. 4 :: Int] $ \i -> do let corrId = bshow i msg = "message " <> bshow i - (_, "bob", Right (MID mId)) <- alice #: (corrId, "bob", "SEND :" <> msg) + (_, "bob", Right (MID mId)) <- alice #: (corrId, "bob", "SEND F :" <> msg) alice <#= \case ("", "bob", SENT m) -> m == mId; _ -> False - (_, "bob", Right (MID _)) <- alice #: ("5", "bob", "SEND :over quota") + (_, "bob", Right (MID _)) <- alice #: ("5", "bob", "SEND F :over quota") - alice #: ("1", "bob2", "SEND :hello") #> ("1", "bob2", MID 5) + alice #: ("1", "bob2", "SEND F :hello") #> ("1", "bob2", MID 5) -- if delivery is blocked it won't go further alice <# ("", "bob2", SENT 5) @@ -432,10 +432,10 @@ connect (h1, name1) (h2, name2) = do sendMessage :: Transport c => (c, ConnId) -> (c, ConnId) -> ByteString -> IO () sendMessage (h1, name1) (h2, name2) msg = do - ("m1", name2', Right (MID mId)) <- h1 #: ("m1", name2, "SEND :" <> msg) + ("m1", name2', Right (MID mId)) <- h1 #: ("m1", name2, "SEND F :" <> msg) name2' `shouldBe` name2 h1 <#= \case ("", n, SENT m) -> n == name2 && m == mId; _ -> False - ("", name1', Right (MSG MsgMeta {recipient = (msgId', _)} msg')) <- (h2 <#:) + ("", name1', Right (MSG MsgMeta {recipient = (msgId', _)} _ msg')) <- (h2 <#:) name1' `shouldBe` name1 msg' `shouldBe` msg h2 #: ("m2", name1, "ACK " <> bshow msgId') =#> \case ("m2", n, OK) -> n == name1; _ -> False diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index a0195de23..43925c830 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -18,6 +18,7 @@ import Simplex.Messaging.Agent import Simplex.Messaging.Agent.Env.SQLite (AgentConfig (..)) import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Protocol (ErrorType (..), MsgBody) +import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server.Env.STM (ServerConfig (..)) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Transport (ATransport (..)) @@ -34,7 +35,7 @@ get :: MonadIO m => AgentClient -> m (ATransmission 'Agent) get c = atomically (readTBQueue $ subQ c) pattern Msg :: MsgBody -> ACommand 'Agent -pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} msgBody +pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} _ msgBody functionalAPITests :: ATransport -> Spec functionalAPITests t = do @@ -71,24 +72,24 @@ testAgentClient = do get bob ##> ("", aliceId, INFO "alice's connInfo") get bob ##> ("", aliceId, CON) -- message IDs 1 to 4 get assigned to control messages, so first MSG is assigned ID 5 - 5 <- sendMessage alice bobId "hello" + 5 <- sendMessage alice bobId SMP.noMsgFlags "hello" get alice ##> ("", bobId, SENT 5) - 6 <- sendMessage alice bobId "how are you?" + 6 <- sendMessage alice bobId SMP.noMsgFlags "how are you?" get alice ##> ("", bobId, SENT 6) get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False ackMessage bob aliceId 5 get bob =##> \case ("", c, Msg "how are you?") -> c == aliceId; _ -> False ackMessage bob aliceId 6 - 7 <- sendMessage bob aliceId "hello too" + 7 <- sendMessage bob aliceId SMP.noMsgFlags "hello too" get bob ##> ("", aliceId, SENT 7) - 8 <- sendMessage bob aliceId "message 1" + 8 <- sendMessage bob aliceId SMP.noMsgFlags "message 1" get bob ##> ("", aliceId, SENT 8) get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False ackMessage alice bobId 7 get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False ackMessage alice bobId 8 suspendConnection alice bobId - 9 <- sendMessage bob aliceId "message 2" + 9 <- sendMessage bob aliceId SMP.noMsgFlags "message 2" get bob ##> ("", aliceId, MERR 9 (SMP AUTH)) deleteConnection alice bobId liftIO $ noMessages alice "nothing else should be delivered to alice" @@ -237,11 +238,11 @@ testActiveClientNotDisconnected t = do exchangeGreetings :: AgentClient -> ConnId -> AgentClient -> ConnId -> ExceptT AgentErrorType IO () exchangeGreetings alice bobId bob aliceId = do - 5 <- sendMessage alice bobId "hello" + 5 <- sendMessage alice bobId SMP.noMsgFlags "hello" get alice ##> ("", bobId, SENT 5) get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False ackMessage bob aliceId 5 - 6 <- sendMessage bob aliceId "hello too" + 6 <- sendMessage bob aliceId SMP.noMsgFlags "hello too" get bob ##> ("", aliceId, SENT 6) get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False ackMessage alice bobId 6 diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 28f39379b..4dd9f89bf 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -27,6 +27,7 @@ import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import qualified Simplex.Messaging.Crypto as C +import qualified Simplex.Messaging.Protocol as SMP import System.Random import Test.Hspec import UnliftIO.Directory (removeFile) @@ -398,6 +399,7 @@ mkRcvMsgData internalId internalRcvId externalSndId brokerId internalHash = broker = (brokerId, ts) }, msgType = AM_A_MSG_, + msgFlags = SMP.noMsgFlags, msgBody = hw, internalHash, externalPrevSndHash = "hash_from_sender" @@ -427,6 +429,7 @@ mkSndMsgData internalId internalSndId internalHash = internalSndId, internalTs = ts, msgType = AM_A_MSG_, + msgFlags = SMP.noMsgFlags, msgBody = hw, internalHash, prevMsgHash = internalHash diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 49614d8e7..725688ba8 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -56,14 +56,14 @@ pattern Ids :: RecipientId -> SenderId -> RcvPublicDhKey -> BrokerMsg pattern Ids rId sId srvDh <- IDS (QIK rId sId srvDh) sendRecv :: forall c p. (Transport c, PartyI p) => THandle c -> (Maybe C.ASignature, ByteString, ByteString, Command p) -> IO (SignedTransmission BrokerMsg) -sendRecv h@THandle {sessionId} (sgn, corrId, qId, cmd) = do - let t = encodeTransmission sessionId (CorrId corrId, qId, cmd) +sendRecv h@THandle {thVersion, sessionId} (sgn, corrId, qId, cmd) = do + let t = encodeTransmission thVersion sessionId (CorrId corrId, qId, cmd) Right () <- tPut h (sgn, t) tGet h signSendRecv :: forall c p. (Transport c, PartyI p) => THandle c -> C.APrivateSignKey -> (ByteString, ByteString, Command p) -> IO (SignedTransmission BrokerMsg) -signSendRecv h@THandle {sessionId} pk (corrId, qId, cmd) = do - let t = encodeTransmission sessionId (CorrId corrId, qId, cmd) +signSendRecv h@THandle {thVersion, sessionId} pk (corrId, qId, cmd) = do + let t = encodeTransmission thVersion sessionId (CorrId corrId, qId, cmd) Right sig <- runExceptT $ C.sign pk t Right () <- tPut h (Just sig, t) tGet h @@ -71,6 +71,9 @@ signSendRecv h@THandle {sessionId} pk (corrId, qId, cmd) = do (#==) :: (HasCallStack, Eq a, Show a) => (a, a) -> String -> Assertion (actual, expected) #== message = assertEqual message expected actual +_SEND :: MsgBody -> Command 'Sender +_SEND = SEND noMsgFlags + testCreateSecure :: ATransport -> Spec testCreateSecure (ATransport t) = it "should create (NEW) and secure (KEY) queue" $ @@ -81,11 +84,11 @@ testCreateSecure (ATransport t) = let dec nonce = C.cbDecrypt (C.dh' srvDh dhPriv) (C.cbNonce nonce) (rId1, "") #== "creates queue" - Resp "bcda" sId1 ok1 <- sendRecv h ("", "bcda", sId, SEND "hello") + Resp "bcda" sId1 ok1 <- sendRecv h ("", "bcda", sId, _SEND "hello") (ok1, OK) #== "accepts unsigned SEND" (sId1, sId) #== "same queue ID in response 1" - Resp "" _ (MSG mId1 _ msg1) <- tGet h + Resp "" _ (MSG mId1 _ _ msg1) <- tGet h (dec mId1 msg1, Right "hello") #== "delivers message" Resp "cdab" _ ok4 <- signSendRecv h rKey ("cdab", rId, ACK) @@ -95,7 +98,7 @@ testCreateSecure (ATransport t) = (err6, ERR NO_MSG) #== "replies ERR when message acknowledged without messages" (sPub, sKey) <- C.generateSignatureKeyPair C.SEd448 - Resp "abcd" sId2 err1 <- signSendRecv h sKey ("abcd", sId, SEND "hello") + Resp "abcd" sId2 err1 <- signSendRecv h sKey ("abcd", sId, _SEND "hello") (err1, ERR AUTH) #== "rejects signed SEND" (sId2, sId) #== "same queue ID in response 2" @@ -112,16 +115,16 @@ testCreateSecure (ATransport t) = Resp "abcd" _ err4 <- signSendRecv h rKey ("abcd", rId, KEY sPub) (err4, ERR AUTH) #== "rejects KEY if already secured" - Resp "bcda" _ ok3 <- signSendRecv h sKey ("bcda", sId, SEND "hello again") + Resp "bcda" _ ok3 <- signSendRecv h sKey ("bcda", sId, _SEND "hello again") (ok3, OK) #== "accepts signed SEND" - Resp "" _ (MSG mId2 _ msg2) <- tGet h + Resp "" _ (MSG mId2 _ _ msg2) <- tGet h (dec mId2 msg2, Right "hello again") #== "delivers message 2" Resp "cdab" _ ok5 <- signSendRecv h rKey ("cdab", rId, ACK) (ok5, OK) #== "replies OK when message acknowledged 2" - Resp "dabc" _ err5 <- sendRecv h ("", "dabc", sId, SEND "hello") + Resp "dabc" _ err5 <- sendRecv h ("", "dabc", sId, _SEND "hello") (err5, ERR AUTH) #== "rejects unsigned SEND" testCreateDelete :: ATransport -> Spec @@ -138,13 +141,13 @@ testCreateDelete (ATransport t) = Resp "bcda" _ ok1 <- signSendRecv rh rKey ("bcda", rId, KEY sPub) (ok1, OK) #== "secures queue" - Resp "cdab" _ ok2 <- signSendRecv sh sKey ("cdab", sId, SEND "hello") + Resp "cdab" _ ok2 <- signSendRecv sh sKey ("cdab", sId, _SEND "hello") (ok2, OK) #== "accepts signed SEND" - Resp "dabc" _ ok7 <- signSendRecv sh sKey ("dabc", sId, SEND "hello 2") + Resp "dabc" _ ok7 <- signSendRecv sh sKey ("dabc", sId, _SEND "hello 2") (ok7, OK) #== "accepts signed SEND 2 - this message is not delivered because the first is not ACKed" - Resp "" _ (MSG mId1 _ msg1) <- tGet rh + Resp "" _ (MSG mId1 _ _ msg1) <- tGet rh (dec mId1 msg1, Right "hello") #== "delivers message" Resp "abcd" _ err1 <- sendRecv rh (sampleSig, "abcd", rId, OFF) @@ -157,16 +160,16 @@ testCreateDelete (ATransport t) = (ok3, OK) #== "suspends queue" (rId2, rId) #== "same queue ID in response 2" - Resp "dabc" _ err3 <- signSendRecv sh sKey ("dabc", sId, SEND "hello") + Resp "dabc" _ err3 <- signSendRecv sh sKey ("dabc", sId, _SEND "hello") (err3, ERR AUTH) #== "rejects signed SEND" - Resp "abcd" _ err4 <- sendRecv sh ("", "abcd", sId, SEND "hello") + Resp "abcd" _ err4 <- sendRecv sh ("", "abcd", sId, _SEND "hello") (err4, ERR AUTH) #== "reject unsigned SEND too" Resp "bcda" _ ok4 <- signSendRecv rh rKey ("bcda", rId, OFF) (ok4, OK) #== "accepts OFF when suspended" - Resp "cdab" _ (MSG mId2 _ msg2) <- signSendRecv rh rKey ("cdab", rId, SUB) + Resp "cdab" _ (MSG mId2 _ _ msg2) <- signSendRecv rh rKey ("cdab", rId, SUB) (dec mId2 msg2, Right "hello") #== "accepts SUB when suspended and delivers the message again (because was not ACKed)" Resp "dabc" _ err5 <- sendRecv rh (sampleSig, "dabc", rId, DEL) @@ -179,10 +182,10 @@ testCreateDelete (ATransport t) = (ok6, OK) #== "deletes queue" (rId3, rId) #== "same queue ID in response 3" - Resp "cdab" _ err7 <- signSendRecv sh sKey ("cdab", sId, SEND "hello") + Resp "cdab" _ err7 <- signSendRecv sh sKey ("cdab", sId, _SEND "hello") (err7, ERR AUTH) #== "rejects signed SEND when deleted" - Resp "dabc" _ err8 <- sendRecv sh ("", "dabc", sId, SEND "hello") + Resp "dabc" _ err8 <- sendRecv sh ("", "dabc", sId, _SEND "hello") (err8, ERR AUTH) #== "rejects unsigned SEND too when deleted" Resp "abcd" _ err11 <- signSendRecv rh rKey ("abcd", rId, ACK) @@ -232,10 +235,10 @@ testDuplex (ATransport t) = -- aSnd ID is passed to Bob out-of-band (bsPub, bsKey) <- C.generateSignatureKeyPair C.SEd448 - Resp "bcda" _ OK <- sendRecv bob ("", "bcda", aSnd, SEND $ "key " <> strEncode bsPub) + Resp "bcda" _ OK <- sendRecv bob ("", "bcda", aSnd, _SEND $ "key " <> strEncode bsPub) -- "key ..." is ad-hoc, not a part of SMP protocol - Resp "" _ (MSG mId1 _ msg1) <- tGet alice + Resp "" _ (MSG mId1 _ _ msg1) <- tGet alice Resp "cdab" _ OK <- signSendRecv alice arKey ("cdab", aRcv, ACK) Right ["key", bobKey] <- pure $ B.words <$> aDec mId1 msg1 (bobKey, strEncode bsPub) #== "key received from Bob" @@ -245,33 +248,33 @@ testDuplex (ATransport t) = (bDhPub, bDhPriv :: C.PrivateKeyX25519) <- C.generateKeyPair' Resp "abcd" _ (Ids bRcv bSnd bSrvDh) <- signSendRecv bob brKey ("abcd", "", NEW brPub bDhPub) let bDec nonce = C.cbDecrypt (C.dh' bSrvDh bDhPriv) (C.cbNonce nonce) - Resp "bcda" _ OK <- signSendRecv bob bsKey ("bcda", aSnd, SEND $ "reply_id " <> encode bSnd) + Resp "bcda" _ OK <- signSendRecv bob bsKey ("bcda", aSnd, _SEND $ "reply_id " <> encode bSnd) -- "reply_id ..." is ad-hoc, not a part of SMP protocol - Resp "" _ (MSG mId2 _ msg2) <- tGet alice + Resp "" _ (MSG mId2 _ _ msg2) <- tGet alice Resp "cdab" _ OK <- signSendRecv alice arKey ("cdab", aRcv, ACK) Right ["reply_id", bId] <- pure $ B.words <$> aDec mId2 msg2 (bId, encode bSnd) #== "reply queue ID received from Bob" (asPub, asKey) <- C.generateSignatureKeyPair C.SEd448 - Resp "dabc" _ OK <- sendRecv alice ("", "dabc", bSnd, SEND $ "key " <> strEncode asPub) + Resp "dabc" _ OK <- sendRecv alice ("", "dabc", bSnd, _SEND $ "key " <> strEncode asPub) -- "key ..." is ad-hoc, not a part of SMP protocol - Resp "" _ (MSG mId3 _ msg3) <- tGet bob + Resp "" _ (MSG mId3 _ _ msg3) <- tGet bob Resp "abcd" _ OK <- signSendRecv bob brKey ("abcd", bRcv, ACK) Right ["key", aliceKey] <- pure $ B.words <$> bDec mId3 msg3 (aliceKey, strEncode asPub) #== "key received from Alice" Resp "bcda" _ OK <- signSendRecv bob brKey ("bcda", bRcv, KEY asPub) - Resp "cdab" _ OK <- signSendRecv bob bsKey ("cdab", aSnd, SEND "hi alice") + Resp "cdab" _ OK <- signSendRecv bob bsKey ("cdab", aSnd, _SEND "hi alice") - Resp "" _ (MSG mId4 _ msg4) <- tGet alice + Resp "" _ (MSG mId4 _ _ msg4) <- tGet alice Resp "dabc" _ OK <- signSendRecv alice arKey ("dabc", aRcv, ACK) (aDec mId4 msg4, Right "hi alice") #== "message received from Bob" - Resp "abcd" _ OK <- signSendRecv alice asKey ("abcd", bSnd, SEND "how are you bob") + Resp "abcd" _ OK <- signSendRecv alice asKey ("abcd", bSnd, _SEND "how are you bob") - Resp "" _ (MSG mId5 _ msg5) <- tGet bob + Resp "" _ (MSG mId5 _ _ msg5) <- tGet bob Resp "bcda" _ OK <- signSendRecv bob brKey ("bcda", bRcv, ACK) (bDec mId5 msg5, Right "how are you bob") #== "message received from alice" @@ -283,26 +286,26 @@ testSwitchSub (ATransport t) = (dhPub, dhPriv :: C.PrivateKeyX25519) <- C.generateKeyPair' Resp "abcd" _ (Ids rId sId srvDh) <- signSendRecv rh1 rKey ("abcd", "", NEW rPub dhPub) let dec nonce = C.cbDecrypt (C.dh' srvDh dhPriv) (C.cbNonce nonce) - Resp "bcda" _ ok1 <- sendRecv sh ("", "bcda", sId, SEND "test1") + Resp "bcda" _ ok1 <- sendRecv sh ("", "bcda", sId, _SEND "test1") (ok1, OK) #== "sent test message 1" - Resp "cdab" _ ok2 <- sendRecv sh ("", "cdab", sId, SEND "test2, no ACK") + Resp "cdab" _ ok2 <- sendRecv sh ("", "cdab", sId, _SEND "test2, no ACK") (ok2, OK) #== "sent test message 2" - Resp "" _ (MSG mId1 _ msg1) <- tGet rh1 + Resp "" _ (MSG mId1 _ _ msg1) <- tGet rh1 (dec mId1 msg1, Right "test1") #== "test message 1 delivered to the 1st TCP connection" - Resp "abcd" _ (MSG mId2 _ msg2) <- signSendRecv rh1 rKey ("abcd", rId, ACK) + Resp "abcd" _ (MSG mId2 _ _ msg2) <- signSendRecv rh1 rKey ("abcd", rId, ACK) (dec mId2 msg2, Right "test2, no ACK") #== "test message 2 delivered, no ACK" - Resp "bcda" _ (MSG mId2' _ msg2') <- signSendRecv rh2 rKey ("bcda", rId, SUB) + Resp "bcda" _ (MSG mId2' _ _ msg2') <- signSendRecv rh2 rKey ("bcda", rId, SUB) (dec mId2' msg2', Right "test2, no ACK") #== "same simplex queue via another TCP connection, tes2 delivered again (no ACK in 1st queue)" Resp "cdab" _ OK <- signSendRecv rh2 rKey ("cdab", rId, ACK) Resp "" _ end <- tGet rh1 (end, END) #== "unsubscribed the 1st TCP connection" - Resp "dabc" _ OK <- sendRecv sh ("", "dabc", sId, SEND "test3") + Resp "dabc" _ OK <- sendRecv sh ("", "dabc", sId, _SEND "test3") - Resp "" _ (MSG mId3 _ msg3) <- tGet rh2 + Resp "" _ (MSG mId3 _ _ msg3) <- tGet rh2 (dec mId3 msg3, Right "test3") #== "delivered to the 2nd TCP connection" Resp "abcd" _ err <- signSendRecv rh1 rKey ("abcd", rId, ACK) @@ -338,15 +341,15 @@ testWithStoreLog at@(ATransport t) = writeTVar senderId1 sId1 writeTVar notifierId nId Resp "dabc" _ OK <- signSendRecv h1 nKey ("dabc", nId, NSUB) - Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, SEND "hello") - Resp "" _ (MSG mId1 _ msg1) <- tGet h + Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, _SEND "hello") + Resp "" _ (MSG mId1 _ _ msg1) <- tGet h (C.cbDecrypt dhShared (C.cbNonce mId1) msg1, Right "hello") #== "delivered from queue 1" Resp "" _ NMSG <- tGet h1 (sId2, rId2, rKey2, dhShared2) <- createAndSecureQueue h sPub2 atomically $ writeTVar senderId2 sId2 - Resp "cdab" _ OK <- signSendRecv h sKey2 ("cdab", sId2, SEND "hello too") - Resp "" _ (MSG mId2 _ msg2) <- tGet h + Resp "cdab" _ OK <- signSendRecv h sKey2 ("cdab", sId2, _SEND "hello too") + Resp "" _ (MSG mId2 _ _ msg2) <- tGet h (C.cbDecrypt dhShared2 (C.cbNonce mId2) msg2, Right "hello too") #== "delivered from queue 2" Resp "dabc" _ OK <- signSendRecv h rKey2 ("dabc", rId2, DEL) @@ -357,7 +360,7 @@ testWithStoreLog at@(ATransport t) = withSmpServerThreadOn at testPort . runTest t $ \h -> do sId1 <- readTVarIO senderId1 -- fails if store log is disabled - Resp "bcda" _ (ERR AUTH) <- signSendRecv h sKey1 ("bcda", sId1, SEND "hello") + Resp "bcda" _ (ERR AUTH) <- signSendRecv h sKey1 ("bcda", sId1, _SEND "hello") pure () withSmpServerStoreLogOn at testPort . runTest t $ \h -> runClient t $ \h1 -> do @@ -368,13 +371,13 @@ testWithStoreLog at@(ATransport t) = sId1 <- readTVarIO senderId1 nId <- readTVarIO notifierId Resp "dabc" _ OK <- signSendRecv h1 nKey ("dabc", nId, NSUB) - Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, SEND "hello") - Resp "cdab" _ (MSG mId3 _ msg3) <- signSendRecv h rKey1 ("cdab", rId1, SUB) + Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, _SEND "hello") + Resp "cdab" _ (MSG mId3 _ _ msg3) <- signSendRecv h rKey1 ("cdab", rId1, SUB) (C.cbDecrypt dh1 (C.cbNonce mId3) msg3, Right "hello") #== "delivered from restored queue" Resp "" _ NMSG <- tGet h1 -- this queue is removed - not restored sId2 <- readTVarIO senderId2 - Resp "cdab" _ (ERR AUTH) <- signSendRecv h sKey2 ("cdab", sId2, SEND "hello too") + Resp "cdab" _ (ERR AUTH) <- signSendRecv h sKey2 ("cdab", sId2, _SEND "hello too") pure () logSize `shouldReturn` 1 @@ -432,11 +435,11 @@ testTiming (ATransport t) = (sPub, sKey) <- generateKeys goodKeySize Resp "dabc" _ OK <- signSendRecv rh rKey ("dabc", rId, KEY sPub) - Resp "bcda" _ OK <- signSendRecv sh sKey ("bcda", sId, SEND "hello") - Resp "" _ (MSG mId _ msg) <- tGet rh + Resp "bcda" _ OK <- signSendRecv sh sKey ("bcda", sId, _SEND "hello") + Resp "" _ (MSG mId _ _ msg) <- tGet rh (dec mId msg, Right "hello") #== "delivered from queue" - runTimingTest sh badKey sId $ SEND "hello" + runTimingTest sh badKey sId $ _SEND "hello" where generateKeys = \case 32 -> C.generateSignatureKeyPair C.SEd25519 @@ -468,15 +471,15 @@ testMessageNotifications (ATransport t) = let dec nonce = C.cbDecrypt dhShared (C.cbNonce nonce) Resp "1" _ (NID nId) <- signSendRecv rh rKey ("1", rId, NKEY nPub) Resp "2" _ OK <- signSendRecv nh1 nKey ("2", nId, NSUB) - Resp "3" _ OK <- signSendRecv sh sKey ("3", sId, SEND "hello") - Resp "" _ (MSG mId1 _ msg1) <- tGet rh + Resp "3" _ OK <- signSendRecv sh sKey ("3", sId, _SEND "hello") + Resp "" _ (MSG mId1 _ _ msg1) <- tGet rh (dec mId1 msg1, Right "hello") #== "delivered from queue" Resp "3a" _ OK <- signSendRecv rh rKey ("3a", rId, ACK) Resp "" _ NMSG <- tGet nh1 Resp "4" _ OK <- signSendRecv nh2 nKey ("4", nId, NSUB) Resp "" _ END <- tGet nh1 - Resp "5" _ OK <- signSendRecv sh sKey ("5", sId, SEND "hello again") - Resp "" _ (MSG mId2 _ msg2) <- tGet rh + Resp "5" _ OK <- signSendRecv sh sKey ("5", sId, _SEND "hello again") + Resp "" _ (MSG mId2 _ _ msg2) <- tGet rh (dec mId2 msg2, Right "hello again") #== "delivered from queue again" Resp "" _ NMSG <- tGet nh2 1000 `timeout` tGet @BrokerMsg nh1 >>= \case @@ -492,11 +495,11 @@ testMsgExpireOnSend t = testSMPClient @c $ \sh -> do (sId, rId, rKey, dhShared) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub let dec nonce = C.cbDecrypt dhShared (C.cbNonce nonce) - Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, SEND "hello (should expire)") + Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello (should expire)") threadDelay 2500000 - Resp "2" _ OK <- signSendRecv sh sKey ("2", sId, SEND "hello (should NOT expire)") + Resp "2" _ OK <- signSendRecv sh sKey ("2", sId, _SEND "hello (should NOT expire)") testSMPClient @c $ \rh -> do - Resp "3" _ (MSG mId _ msg) <- signSendRecv rh rKey ("3", rId, SUB) + Resp "3" _ (MSG mId _ _ msg) <- signSendRecv rh rKey ("3", rId, SUB) (dec mId msg, Right "hello (should NOT expire)") #== "delivered" 1000 `timeout` tGet @BrokerMsg rh >>= \case Nothing -> return () @@ -510,7 +513,7 @@ testMsgExpireOnInterval t = withSmpServerConfigOn (ATransport t) cfg' testPort $ \_ -> testSMPClient @c $ \sh -> do (sId, rId, rKey, _) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub - Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, SEND "hello (should expire)") + Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello (should expire)") threadDelay 2500000 testSMPClient @c $ \rh -> do Resp "2" _ OK <- signSendRecv rh rKey ("2", rId, SUB) @@ -527,10 +530,10 @@ testMsgNOTExpireOnInterval t = testSMPClient @c $ \sh -> do (sId, rId, rKey, dhShared) <- testSMPClient @c $ \rh -> createAndSecureQueue rh sPub let dec nonce = C.cbDecrypt dhShared (C.cbNonce nonce) - Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, SEND "hello (should NOT expire)") + Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello (should NOT expire)") threadDelay 2500000 testSMPClient @c $ \rh -> do - Resp "2" _ (MSG mId _ msg) <- signSendRecv rh rKey ("2", rId, SUB) + Resp "2" _ (MSG mId _ _ msg) <- signSendRecv rh rKey ("2", rId, SUB) (dec mId msg, Right "hello (should NOT expire)") #== "delivered" 1000 `timeout` tGet @BrokerMsg rh >>= \case Nothing -> return () @@ -564,9 +567,9 @@ syntaxTests (ATransport t) = do noParamsSyntaxTest "OFF" OFF_ noParamsSyntaxTest "DEL" DEL_ describe "SEND" $ do - it "valid syntax" $ (sampleSig, "cdab", "12345678", (SEND_, ' ', "hello" :: ByteString)) >#> ("", "cdab", "12345678", ERR AUTH) + it "valid syntax" $ (sampleSig, "cdab", "12345678", (SEND_, ' ', noMsgFlags, ' ', "hello" :: ByteString)) >#> ("", "cdab", "12345678", ERR AUTH) it "no parameters" $ (sampleSig, "abcd", "12345678", SEND_) >#> ("", "abcd", "12345678", ERR $ CMD SYNTAX) - it "no queue ID" $ (sampleSig, "bcda", "", (SEND_, ' ', "hello" :: ByteString)) >#> ("", "bcda", "", ERR $ CMD NO_ENTITY) + it "no queue ID" $ (sampleSig, "bcda", "", (SEND_, ' ', noMsgFlags, ' ', "hello" :: ByteString)) >#> ("", "bcda", "", ERR $ CMD NO_ENTITY) describe "PING" $ do it "valid syntax" $ ("", "abcd", "", PING_) >#> ("", "abcd", "", PONG) describe "broker response not allowed" $ do