diff --git a/protocol/simplex-messaging.md b/protocol/simplex-messaging.md index 884ae98ec..7818773d6 100644 --- a/protocol/simplex-messaging.md +++ b/protocol/simplex-messaging.md @@ -11,7 +11,8 @@ - [SMP qualities and features](#smp-qualities-and-features) - [Cryptographic algorithms](#cryptographic-algorithms) - [Simplex queue IDs](#simplex-queue-ids) -- [Server privacy requirements](#server-privacy-requirements) +- [Server security requirements](#server-security-requirements) +- [Message delivery notifications](#message-delivery-notifications) - [SMP commands](#smp-commands) - [Correlating responses with commands](#correlating-responses-with-commands) - [Command authentication](#command-authentication) @@ -20,14 +21,19 @@ - [Create queue command](#create-queue-command) - [Subscribe to queue](#subscribe-to-queue) - [Secure queue command](#secure-queue-command) + - [Enable notifications command](#enable-notifications-command) - [Acknowledge message delivery](#acknowledge-message-delivery) - [Suspend queue](#suspend-queue) - [Delete queue](#delete-queue) - [Sender commands](#sender-commands) - [Send message](#send-message) + - [Notifier commands](#notifier-commands) + - [Subscribe to queue notifications](#subscribe-to-queue-notifications) - [Server messages](#server-messages) - [Queue IDs response](#queue-ids-response) - [Deliver queue message](#deliver-queue-message) + - [Notifier queue ID response](#notifier-queue-id-response) + - [Deliver message notification](#deliver-message-notification) - [Subscription END notification](#subscription-end-notification) - [Error responses](#error-responses) - [OK response](#ok-response) @@ -280,11 +286,14 @@ Simplex messaging clients need to cryptographically sign commands for the follow - create the queue (`NEW`) - subscribe to queue (`SUB`) - secure the queue (`KEY`) + - enable queue notifications (`NKEY`) - acknowledge received messages (`ACK`) - suspend the queue (`OFF`) - delete the queue (`DEL`) - With the sender's key `SK` (server to verify): - send messages (`SEND`) +- With the optional notifier's key: + - subscribe to message notifications (`NSUB`) To sign and verify commands, clients and servers MUST use RSA-PSS algorithm defined in [RFC3447][2]. @@ -319,6 +328,18 @@ Simplex messaging server implementations MUST NOT create, store or send to any o - Any other information that may compromise privacy or [forward secrecy][4] of communication between clients using simplex messaging servers. +## Message delivery notifications + +Supporting message delivery while the client mobile app is not running requires sending push notifications with the device token. All alternative mechanisms for background message delivery are unreliable, particularly on iOS platform. Obviously, supporting push notification delivery by simply subscribing to messages would reduce meta-data privacy as it allows to see all queues that a given device uses. + +To protect the privacy of the recipients, there are several commands in SMP protocol that allow enabling and subscribing to message notifications from SMP queues, using separate set of "notifier keys" and via separate queue IDs - as long as SMP server is not compromised, these notifier queue IDs cannot be correlated with recipient or sender queue IDs. + +The clients can optionally instruct a dedicated push notification server to subscribe to notifications and deliver push notifications to the device, which can then retrieve the messages in the background and send local notifications to the user - this is out of scope of SMP protocol. The commands that SMP protocol provides to allow it: + +- `enableNotifications` (`"NKEY"`) with `notifierId` (`"NID"`) response - see [Enable notifications command](#enable-notifications-command). +- `subscribeNotifications` (`"NSUB"`) - see [Subscribe to queue notifications](#subscribe-to-queue-notifications). +- `messageNotification` (`"NMSG"`) - see [Deliver message notification](#deliver-message-notification). + ## SMP commands Commands syntax below is provided using [ABNF][8] with [case-sensitive strings extension][8a]. @@ -328,9 +349,11 @@ Each transmission between the client and the server must have this format/syntax ```abnf transmission = [signature] SP signed SP pad ; pad to the fixed block size signed = [corrId] SP [queueId] SP cmd -cmd = ping / recipientCmd / send / serverMsg -recipientCmd = create / subscribe / secure / acknowledge / suspend / delete -serverMsg = pong / queueIds / message / unsubscribed / ok / error +cmd = ping / recipientCmd / send / subscribeNotifications / serverMsg +recipientCmd = create / subscribe / secure / enableNotifications / + acknowledge / suspend / delete +serverMsg = pong / queueIds / message / notifierId / messageNotification / + unsubscribed / ok / error corrId = 1*(%x21-7F) ; any characters other than control/whitespace queueId = encoded ; empty queue ID is used with "create" command signature = encoded @@ -414,6 +437,26 @@ senderKey = %s"rsa:" x509encoded ; the sender's RSA public key for this queue Once the queue is secured only signed messages can be sent to it. +#### Enable notifications command + +This command is sent by the recipient to the server to add notifier's key to the queue, to allow push notifications server to receive notifications when the message arrives, via a separate queue ID, without receiving message content. + +```abnf +enableNotifications = %s"NKEY" SP notifierKey +notifierKey = %s"rsa:" x509encoded ; the notifier's RSA public key for this queue +``` + +The server will respond with `notifierId` response if notifications were enabled and the notifier's key was successfully added to the queue: + +```abnf +notifierId = %s"NID" SP notifierId +recipientId = encoded +``` + +This response is sent with the recipient's queue ID (the second part of the transmission). + +To receive the message notifications, `subscribeNotifications` command ("NSUB") must be sent signed with the notifier's key. + #### Acknowledge message delivery The recipient should send the acknowledgement of message delivery once the message was stored in the client, to notify the server that the message should be deleted: @@ -491,6 +534,20 @@ clientBody = *OCTET `clientHeader` in the initial unsigned message is used to transmit sender's server key and can be used in the future revisions of SMP protocol for other purposes. +### Notifier commands + +#### Subscribe to queue notifications + +The push notifications server (notifier) must use this command to start receiving message notifications from the queue: + +```abnf +subscribeNotifications = %s"NSUB" +``` + +If subscription is successful the server must respond with `ok` response if no messages are available. The notifier will be receiving the message notifications from this queue until the transport connection is closed or until another transport connection subscribes to notifications from the same simplex queue - in this case the first subscription should be cancelled and [subscription END notification](#subscription-end-notification) delivered. + +The first message notification will be delivered either immediately or as soon as the message is available. + ### Server messages #### Queue IDs response @@ -515,6 +572,22 @@ timestamp = `binaryMsg` - see syntax in [Send message](#send-message) +#### Notifier queue ID response + +Server must respond with this message when queue notifications are enabled. + +See its syntax in [Enable notifications command](#enable-notifications-command) + +#### Deliver message notification + +The server must deliver message notifications to all simplex queues that were subscribed with `subscribeNotifications` command ("NSUB") on the currently open transport connection. The syntax for the message notification delivery is: + +```abnf +messageNotification = %s"NMSG" +``` + +Message notification does not contain any message data or meta-data, it only notifies that the message is available. + #### Subscription END notification When another transport connection is subscribed to the same simplex queue, the server should unsubscribe and to send the notification to the previously subscribed transport connection: diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 87b340aae..f39dc2e05 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -30,7 +30,9 @@ module Simplex.Messaging.Client -- * SMP protocol command functions createSMPQueue, subscribeSMPQueue, + subscribeSMPQueueNotifications, secureSMPQueue, + enableSMPQueueNotifications, sendSMPMessage, ackSMPMessage, suspendSMPQueue, @@ -263,7 +265,7 @@ createSMPQueue :: createSMPQueue c rpKey rKey = -- TODO add signing this request too - requires changes in the server sendSMPCommand c (Just rpKey) "" (Cmd SRecipient $ NEW rKey) >>= \case - Cmd _ (IDS rId sId) -> return (rId, sId) + Cmd _ (IDS rId sId) -> pure (rId, sId) _ -> throwE SMPUnexpectedResponse -- | Subscribe to the SMP queue. @@ -277,12 +279,27 @@ subscribeSMPQueue c@SMPClient {smpServer, msgQ} rpKey rId = lift . atomically $ writeTBQueue msgQ (smpServer, rId, cmd) _ -> throwE SMPUnexpectedResponse +-- | Subscribe to the SMP queue notifications. +-- +-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#subscribe-to-queue-notifications +subscribeSMPQueueNotifications :: SMPClient -> NotifierPrivateKey -> NotifierId -> ExceptT SMPClientError IO () +subscribeSMPQueueNotifications = okSMPCommand $ Cmd SNotifier NSUB + -- | Secure the SMP queue by adding a sender public key. -- -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#secure-queue-command secureSMPQueue :: SMPClient -> RecipientPrivateKey -> RecipientId -> SenderPublicKey -> ExceptT SMPClientError IO () secureSMPQueue c rpKey rId senderKey = okSMPCommand (Cmd SRecipient $ KEY senderKey) c rpKey rId +-- | Enable notifications for the queue for push notifications server. +-- +-- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#enable-notifications-command +enableSMPQueueNotifications :: SMPClient -> RecipientPrivateKey -> RecipientId -> NotifierPublicKey -> ExceptT SMPClientError IO NotifierId +enableSMPQueueNotifications c rpKey rId notifierKey = + sendSMPCommand c (Just rpKey) rId (Cmd SRecipient $ NKEY notifierKey) >>= \case + Cmd _ (NID nId) -> pure nId + _ -> throwE SMPUnexpectedResponse + -- | Send SMP message. -- -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#send-message diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 85d6e8369..02056acc4 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -39,10 +39,13 @@ module Simplex.Messaging.Protocol QueueId, RecipientId, SenderId, + NotifierId, RecipientPrivateKey, RecipientPublicKey, SenderPrivateKey, SenderPublicKey, + NotifierPrivateKey, + NotifierPublicKey, Encoded, MsgId, MsgBody, @@ -85,7 +88,7 @@ import Simplex.Messaging.Util import Test.QuickCheck (Arbitrary (..)) -- | SMP protocol participants. -data Party = Broker | Recipient | Sender +data Party = Broker | Recipient | Sender | Notifier deriving (Show) -- | Singleton types for SMP protocol participants. @@ -93,6 +96,7 @@ data SParty :: Party -> Type where SBroker :: SParty Broker SRecipient :: SParty Recipient SSender :: SParty Sender + SNotifier :: SParty Notifier deriving instance Show (SParty a) @@ -124,6 +128,9 @@ type RecipientId = QueueId -- | SMP queue ID for the sender. type SenderId = QueueId +-- | SMP queue ID for notifications. +type NotifierId = QueueId + -- | SMP queue ID on the server. type QueueId = Encoded @@ -133,15 +140,20 @@ data Command (a :: Party) where NEW :: RecipientPublicKey -> Command Recipient SUB :: Command Recipient KEY :: SenderPublicKey -> Command Recipient + NKEY :: NotifierPublicKey -> Command Recipient ACK :: Command Recipient OFF :: Command Recipient DEL :: Command Recipient -- SMP sender commands SEND :: MsgBody -> Command Sender PING :: Command Sender + -- SMP notification subscriber commands + NSUB :: Command Notifier -- SMP broker commands (responses, messages, notifications) IDS :: RecipientId -> SenderId -> Command Broker MSG :: MsgId -> UTCTime -> MsgBody -> Command Broker + NID :: NotifierId -> Command Broker + NMSG :: Command Broker END :: Command Broker OK :: Command Broker ERR :: ErrorType -> Command Broker @@ -178,6 +190,12 @@ type SenderPrivateKey = C.SafePrivateKey -- | Sender's public key used by SMP server to verify authorization of SMP commands. type SenderPublicKey = C.PublicKey +-- | Private key used by push notifications server to authorize (sign) LSTN command. +type NotifierPrivateKey = C.SafePrivateKey + +-- | Public key used by SMP server to verify authorization of LSTN command sent by push notifications server. +type NotifierPublicKey = C.PublicKey + -- | SMP message server ID. type MsgId = Encoded @@ -240,12 +258,16 @@ commandP = <|> "IDS " *> idsResp <|> "SUB" $> Cmd SRecipient SUB <|> "KEY " *> keyCmd + <|> "NKEY " *> nKeyCmd + <|> "NID " *> nIdsResp <|> "ACK" $> Cmd SRecipient ACK <|> "OFF" $> Cmd SRecipient OFF <|> "DEL" $> Cmd SRecipient DEL <|> "SEND " *> sendCmd <|> "PING" $> Cmd SSender PING + <|> "NSUB" $> Cmd SNotifier NSUB <|> "MSG " *> message + <|> "NMSG" $> Cmd SBroker NMSG <|> "END" $> Cmd SBroker END <|> "OK" $> Cmd SBroker OK <|> "ERR " *> serverError @@ -253,7 +275,9 @@ commandP = where newCmd = Cmd SRecipient . NEW <$> C.pubKeyP idsResp = Cmd SBroker <$> (IDS <$> (base64P <* A.space) <*> base64P) + nIdsResp = Cmd SBroker . NID <$> base64P keyCmd = Cmd SRecipient . KEY <$> C.pubKeyP + nKeyCmd = Cmd SRecipient . NKEY <$> C.pubKeyP sendCmd = do size <- A.decimal <* A.space Cmd SSender . SEND <$> A.take size <* A.space @@ -275,14 +299,23 @@ serializeCommand :: Cmd -> ByteString serializeCommand = \case Cmd SRecipient (NEW rKey) -> "NEW " <> C.serializePubKey rKey Cmd SRecipient (KEY sKey) -> "KEY " <> C.serializePubKey sKey - Cmd SRecipient cmd -> bshow cmd + Cmd SRecipient (NKEY nKey) -> "NKEY " <> C.serializePubKey nKey + Cmd SRecipient SUB -> "SUB" + Cmd SRecipient ACK -> "ACK" + Cmd SRecipient OFF -> "OFF" + Cmd SRecipient DEL -> "DEL" Cmd SSender (SEND msgBody) -> "SEND " <> serializeMsg msgBody Cmd SSender PING -> "PING" + Cmd SNotifier NSUB -> "NSUB" Cmd SBroker (MSG msgId ts msgBody) -> B.unwords ["MSG", encode msgId, B.pack $ formatISO8601Millis ts, serializeMsg msgBody] Cmd SBroker (IDS rId sId) -> B.unwords ["IDS", encode rId, encode sId] + Cmd SBroker (NID nId) -> "NID " <> encode nId Cmd SBroker (ERR err) -> "ERR " <> serializeErrorType err - Cmd SBroker resp -> bshow resp + Cmd SBroker NMSG -> "NMSG" + Cmd SBroker END -> "END" + Cmd SBroker OK -> "OK" + Cmd SBroker PONG -> "PONG" where serializeMsg msgBody = bshow (B.length msgBody) <> " " <> msgBody <> " " @@ -350,7 +383,7 @@ tGet fromParty th = liftIO (tGetParse th) >>= decodeParseValidate tCredentials :: RawTransmission -> Cmd -> Either ErrorType Cmd tCredentials (signature, _, queueId, _) cmd = case cmd of -- IDS response must not have queue ID - Cmd SBroker (IDS _ _) -> Right cmd + Cmd SBroker IDS {} -> Right cmd -- ERR response does not always have queue ID Cmd SBroker (ERR _) -> Right cmd -- PONG response must not have queue ID @@ -362,7 +395,7 @@ tGet fromParty th = liftIO (tGetParse th) >>= decodeParseValidate | B.null queueId -> Left $ CMD NO_QUEUE | otherwise -> Right cmd -- NEW must have signature but NOT queue ID - Cmd SRecipient (NEW _) + Cmd SRecipient NEW {} | B.null signature -> Left $ CMD NO_AUTH | not (B.null queueId) -> Left $ CMD HAS_AUTH | otherwise -> Right cmd @@ -375,6 +408,6 @@ tGet fromParty th = liftIO (tGetParse th) >>= decodeParseValidate | B.null queueId && B.null signature -> Right cmd | otherwise -> Left $ CMD HAS_AUTH -- other client commands must have both signature and queue ID - Cmd SRecipient _ + Cmd _ _ | B.null signature || B.null queueId -> Left $ CMD NO_AUTH | otherwise -> Right cmd diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 8c475f9d1..3d4cd634c 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -35,6 +35,7 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) import qualified Data.Map.Strict as M +import Data.Maybe (isNothing) import Data.Time.Clock import Network.Socket (ServiceName) import qualified Simplex.Messaging.Crypto as C @@ -72,20 +73,38 @@ runSMPServerBlocking started cfg@ServerConfig {transports} = do smpServer :: (MonadUnliftIO m', MonadReader Env m') => m' () smpServer = do s <- asks server - raceAny_ (serverThread s : map runServer transports) + raceAny_ + ( serverThread s subscribedQ subscribers subscriptions cancelSub : + serverThread s ntfSubscribedQ notifiers ntfSubscriptions (\_ -> pure ()) : + map runServer transports + ) `finally` withLog closeStoreLog runServer :: (MonadUnliftIO m', MonadReader Env m') => (ServiceName, ATransport) -> m' () runServer (tcpPort, ATransport t) = runTransportServer started tcpPort (runClient t) - serverThread :: MonadUnliftIO m' => Server -> m' () - serverThread Server {subscribedQ, subscribers} = forever . atomically $ do - (rId, clnt) <- readTBQueue subscribedQ - cs <- readTVar subscribers - case M.lookup rId cs of - Just Client {rcvQ} -> writeTBQueue rcvQ (CorrId B.empty, rId, Cmd SBroker END) - Nothing -> return () - writeTVar subscribers $ M.insert rId clnt cs + serverThread :: + forall m' s. + MonadUnliftIO m' => + Server -> + (Server -> TBQueue (QueueId, Client)) -> + (Server -> TVar (M.Map QueueId Client)) -> + (Client -> TVar (M.Map QueueId s)) -> + (s -> m' ()) -> + m' () + serverThread s subQ subs clientSubs unsub = forever $ do + atomically updateSubscribers >>= mapM_ unsub + where + updateSubscribers :: STM (Maybe s) + updateSubscribers = do + (qId, clnt) <- readTBQueue $ subQ s + serverSubs <- readTVar $ subs s + writeTVar (subs s) $ M.insert qId clnt serverSubs + join <$> mapM (endPreviousSubscriptions qId) (M.lookup qId serverSubs) + endPreviousSubscriptions :: QueueId -> Client -> STM (Maybe s) + endPreviousSubscriptions qId c = do + writeTBQueue (rcvQ c) (CorrId B.empty, qId, Cmd SBroker END) + stateTVar (clientSubs c) $ \ss -> (M.lookup qId ss, M.delete qId ss) runClient :: (Transport c, MonadUnliftIO m, MonadReader Env m) => TProxy c -> c -> m () runClient _ h = do @@ -134,17 +153,18 @@ verifyTransmission (sig, t@(corrId, queueId, cmd)) = do Cmd SBroker _ -> return $ smpErr INTERNAL -- it can only be client command, because `fromClient` was used Cmd SRecipient (NEW k) -> pure $ verifySignature k Cmd SRecipient _ -> verifyCmd SRecipient $ verifySignature . recipientKey - Cmd SSender (SEND _) -> verifyCmd SSender $ verifySend sig . senderKey + Cmd SSender (SEND _) -> verifyCmd SSender $ verifyMaybe sig . senderKey Cmd SSender PING -> return cmd + Cmd SNotifier NSUB -> verifyCmd SNotifier $ verifyMaybe sig . fmap snd . notifier where verifyCmd :: SParty p -> (QueueRec -> Cmd) -> m Cmd verifyCmd party f = do st <- asks queueStore q <- atomically $ getQueue st party queueId pure $ either (const $ dummyVerify authErr) f q - verifySend :: C.Signature -> Maybe SenderPublicKey -> Cmd - verifySend "" = maybe cmd (const authErr) - verifySend _ = maybe authErr verifySignature + verifyMaybe :: C.Signature -> Maybe SenderPublicKey -> Cmd + verifyMaybe "" = maybe cmd (const authErr) + verifyMaybe _ = maybe authErr verifySignature verifySignature :: C.PublicKey -> Cmd verifySignature key = if verify key then cmd else authErr verify key @@ -178,48 +198,43 @@ dummyKey512 :: C.PublicKey dummyKey512 = "MIICoDANBgkqhkiG9w0BAQEFAAOCAo0AMIICiAKCAgEArkCY9DuverJ4mmzDektv9aZMFyeRV46WZK9NsOBKEc+1ncqMs+LhLti9asKNgUBRbNzmbOe0NYYftrUpwnATaenggkTFxxbJ4JGJuGYbsEdFWkXSvrbWGtM8YUmn5RkAGme12xQ89bSM4VoJAGnrYPHwmcQd+KYCPZvTUsxaxgrJTX65ejHN9BsAn8XtGViOtHTDJO9yUMD2WrJvd7wnNa+0ugEteDLzMU++xS98VC+uA1vfauUqi3yXVchdfrLdVUuM+JE0gUEXCgzjuHkaoHiaGNiGhdPYoAJJdOKQOIHAKdk7Th6OPhirPhc9XYNB4O8JDthKhNtfokvFIFlC4QBRzJhpLIENaEBDt08WmgpOnecZB/CuxkqqOrNa8j5K5jNrtXAI67W46VEC2jeQy/gZwb64Zit2A4D00xXzGbQTPGj4ehcEMhLx5LSCygViEf0w0tN3c3TEyUcgPzvECd2ZVpQLr9Z4a07Ebr+YSuxcHhjg4Rg1VyJyOTTvaCBGm5X2B3+tI4NUttmikIHOYpBnsLmHY2BgfH2KcrIsDyAhInXmTFr/L2+erFarUnlfATd2L8Ti43TNHDedO6k6jI5Gyi62yPwjqPLEIIK8l+pIeNfHJ3pPmjhHBfzFcQLMMMXffHWNK8kWklrQXK+4j4HiPcTBvlO1FEtG9nEIZhUCgYA4a6WtI2k5YNli1C89GY5rGUY7RP71T6RWri/D3Lz9T7GvU+FemAyYmsvCQwqijUOur0uLvwSP8VdxpSUcrjJJSWur2hrPWzWlu0XbNaeizxpFeKbQP+zSrWJ1z8RwfAeUjShxt8q1TuqGqY10wQyp3nyiTGvS+KwZVj5h5qx8NQ==" client :: forall m. (MonadUnliftIO m, MonadReader Env m) => Client -> Server -> m () -client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = +client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ = sndQ'} Server {subscribedQ, ntfSubscribedQ, notifiers} = forever $ atomically (readTBQueue rcvQ) >>= processCommand - >>= atomically . writeTBQueue sndQ + >>= atomically . writeTBQueue sndQ' where processCommand :: Transmission -> m Transmission processCommand (corrId, queueId, cmd) = do st <- asks queueStore case cmd of - Cmd SBroker END -> unsubscribeQueue $> (corrId, queueId, cmd) - Cmd SBroker _ -> return (corrId, queueId, cmd) + Cmd SBroker _ -> pure (corrId, queueId, cmd) Cmd SSender command -> case command of SEND msgBody -> sendMessage st msgBody PING -> return (corrId, queueId, Cmd SBroker PONG) + Cmd SNotifier NSUB -> subscribeNotifications Cmd SRecipient command -> case command of NEW rKey -> createQueue st rKey SUB -> subscribeQueue queueId ACK -> acknowledgeMsg KEY sKey -> secureQueue_ st sKey + NKEY nKey -> addQueueNotifier_ st nKey OFF -> suspendQueue_ st DEL -> delQueueAndMsgs st where createQueue :: QueueStore -> RecipientPublicKey -> m Transmission - createQueue st rKey = - checkKeySize rKey addSubscribe + createQueue st rKey = checkKeySize rKey $ addQueueRetry 3 where - addSubscribe = - addQueueRetry 3 >>= \case - Left e -> return $ ERR e - Right (rId, sId) -> do - withLog (`logCreateById` rId) - subscribeQueue rId $> IDS rId sId - - addQueueRetry :: Int -> m (Either ErrorType (RecipientId, SenderId)) - addQueueRetry 0 = return $ Left INTERNAL + addQueueRetry :: Int -> m (Command 'Broker) + addQueueRetry 0 = pure $ ERR INTERNAL addQueueRetry n = do - ids <- getIds + ids@(rId, sId) <- getIds atomically (addQueue st rKey ids) >>= \case Left DUPLICATE_ -> addQueueRetry $ n - 1 - Left e -> return $ Left e - Right _ -> return $ Right ids + Left e -> pure $ ERR e + Right _ -> do + withLog (`logCreateById` rId) + subscribeQueue rId $> IDS rId sId logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO () logCreateById s rId = @@ -237,6 +252,20 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = withLog $ \s -> logSecureQueue s queueId sKey atomically . checkKeySize sKey $ either ERR (const OK) <$> secureQueue st queueId sKey + addQueueNotifier_ :: QueueStore -> NotifierPublicKey -> m Transmission + addQueueNotifier_ st nKey = checkKeySize nKey $ addNotifierRetry 3 + where + addNotifierRetry :: Int -> m (Command 'Broker) + addNotifierRetry 0 = pure $ ERR INTERNAL + addNotifierRetry n = do + nId <- randomId =<< asks (queueIdBytes . config) + atomically (addQueueNotifier st queueId nId nKey) >>= \case + Left DUPLICATE_ -> addNotifierRetry $ n - 1 + Left e -> pure $ ERR e + Right _ -> do + withLog $ \s -> logAddNotifier s queueId nId nKey + pure $ NID nId + checkKeySize :: Monad m' => C.PublicKey -> m' (Command 'Broker) -> m' Transmission checkKeySize key action = mkResp corrId queueId @@ -264,11 +293,13 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = writeTVar subscriptions $ M.insert rId s subs return s - unsubscribeQueue :: m () - unsubscribeQueue = do - sub <- atomically . stateTVar subscriptions $ - \cs -> (M.lookup queueId cs, M.delete queueId cs) - mapM_ cancelSub sub + subscribeNotifications :: m Transmission + subscribeNotifications = atomically $ do + subs <- readTVar ntfSubscriptions + when (isNothing $ M.lookup queueId subs) $ do + writeTBQueue ntfSubscribedQ (queueId, clnt) + writeTVar ntfSubscriptions $ M.insert queueId () subs + pure ok acknowledgeMsg :: m Transmission acknowledgeMsg = @@ -300,9 +331,20 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = quota <- asks $ msgQueueQuota . config atomically $ do q <- getMsgQueue ms (recipientId qr) quota - isFull q >>= \case - False -> writeMsg q msg $> ok - True -> pure $ err QUOTA + ifM (isFull q) (pure $ err QUOTA) $ do + trySendNotification + writeMsg q msg + pure ok + where + trySendNotification :: STM () + trySendNotification = + forM_ (notifier qr) $ \(nId, _) -> + mapM_ (writeNtf nId) . M.lookup nId =<< readTVar notifiers + + writeNtf :: NotifierId -> Client -> STM () + writeNtf nId Client {sndQ} = + unlessM (isFullTBQueue sndQ) $ + writeTBQueue sndQ $ mkResp (CorrId B.empty) nId NMSG deliverMessage :: (MsgQueue -> STM (Maybe Message)) -> RecipientId -> Sub -> m Transmission deliverMessage tryPeek rId = \case @@ -326,7 +368,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = subscriber :: MsgQueue -> m () subscriber q = atomically $ do msg <- peekMsg q - writeTBQueue sndQ $ mkResp (CorrId B.empty) rId (msgCmd msg) + writeTBQueue sndQ' $ mkResp (CorrId B.empty) rId (msgCmd msg) setSub (\s -> s {subThread = NoSub}) void setDelivered diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 5c397096b..5a0ebacea 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -46,11 +46,14 @@ data Env = Env data Server = Server { subscribedQ :: TBQueue (RecipientId, Client), - subscribers :: TVar (Map RecipientId Client) + subscribers :: TVar (Map RecipientId Client), + ntfSubscribedQ :: TBQueue (NotifierId, Client), + notifiers :: TVar (Map NotifierId Client) } data Client = Client { subscriptions :: TVar (Map RecipientId Sub), + ntfSubscriptions :: TVar (Map NotifierId ()), rcvQ :: TBQueue Transmission, sndQ :: TBQueue Transmission } @@ -66,14 +69,17 @@ newServer :: Natural -> STM Server newServer qSize = do subscribedQ <- newTBQueue qSize subscribers <- newTVar M.empty - return Server {subscribedQ, subscribers} + ntfSubscribedQ <- newTBQueue qSize + notifiers <- newTVar M.empty + return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers} newClient :: Natural -> STM Client newClient qSize = do subscriptions <- newTVar M.empty + ntfSubscriptions <- newTVar M.empty rcvQ <- newTBQueue qSize sndQ <- newTBQueue qSize - return Client {subscriptions, rcvQ, sndQ} + return Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} newSubscription :: STM Sub newSubscription = do @@ -94,7 +100,17 @@ newEnv config = do restoreQueues :: QueueStore -> StoreLog 'ReadMode -> m (StoreLog 'WriteMode) restoreQueues queueStore s = do (queues, s') <- liftIO $ readWriteStoreLog s - atomically $ modifyTVar queueStore $ \d -> d {queues, senders = M.foldr' addSender M.empty queues} + atomically $ + modifyTVar queueStore $ \d -> + d + { queues, + senders = M.foldr' addSender M.empty queues, + notifiers = M.foldr' addNotifier M.empty queues + } pure s' addSender :: QueueRec -> Map SenderId RecipientId -> Map SenderId RecipientId addSender q = M.insert (senderId q) (recipientId q) + addNotifier :: QueueRec -> Map NotifierId RecipientId -> Map NotifierId RecipientId + addNotifier q = case notifier q of + Nothing -> id + Just (nId, _) -> M.insert nId (recipientId q) diff --git a/src/Simplex/Messaging/Server/QueueStore.hs b/src/Simplex/Messaging/Server/QueueStore.hs index 79eb2daee..a59a60446 100644 --- a/src/Simplex/Messaging/Server/QueueStore.hs +++ b/src/Simplex/Messaging/Server/QueueStore.hs @@ -8,10 +8,11 @@ module Simplex.Messaging.Server.QueueStore where import Simplex.Messaging.Protocol data QueueRec = QueueRec - { recipientId :: QueueId, - senderId :: QueueId, + { recipientId :: RecipientId, + senderId :: SenderId, recipientKey :: RecipientPublicKey, senderKey :: Maybe SenderPublicKey, + notifier :: Maybe (NotifierId, NotifierPublicKey), status :: QueueStatus } @@ -21,6 +22,7 @@ class MonadQueueStore s m where addQueue :: s -> RecipientPublicKey -> (RecipientId, SenderId) -> m (Either ErrorType ()) getQueue :: s -> SParty (a :: Party) -> QueueId -> m (Either ErrorType QueueRec) secureQueue :: s -> RecipientId -> SenderPublicKey -> m (Either ErrorType ()) + addQueueNotifier :: s -> RecipientId -> NotifierId -> NotifierPublicKey -> m (Either ErrorType ()) suspendQueue :: s -> RecipientId -> m (Either ErrorType ()) deleteQueue :: s -> RecipientId -> m (Either ErrorType ()) @@ -31,5 +33,6 @@ mkQueueRec recipientKey (recipientId, senderId) = senderId, recipientKey, senderKey = Nothing, + notifier = Nothing, status = QueueActive } diff --git a/src/Simplex/Messaging/Server/QueueStore/STM.hs b/src/Simplex/Messaging/Server/QueueStore/STM.hs index 86caff78f..a4da5ec10 100644 --- a/src/Simplex/Messaging/Server/QueueStore/STM.hs +++ b/src/Simplex/Messaging/Server/QueueStore/STM.hs @@ -19,13 +19,14 @@ import UnliftIO.STM data QueueStoreData = QueueStoreData { queues :: Map RecipientId QueueRec, - senders :: Map SenderId RecipientId + senders :: Map SenderId RecipientId, + notifiers :: Map NotifierId RecipientId } type QueueStore = TVar QueueStoreData newQueueStore :: STM QueueStore -newQueueStore = newTVar QueueStoreData {queues = M.empty, senders = M.empty} +newQueueStore = newTVar QueueStoreData {queues = M.empty, senders = M.empty, notifiers = M.empty} instance MonadQueueStore QueueStore STM where addQueue :: QueueStore -> RecipientPublicKey -> (RecipientId, SenderId) -> STM (Either ErrorType ()) @@ -42,22 +43,47 @@ instance MonadQueueStore QueueStore STM where return $ Right () getQueue :: QueueStore -> SParty (p :: Party) -> QueueId -> STM (Either ErrorType QueueRec) - getQueue store SRecipient rId = do - cs <- readTVar store - return $ getRcpQueue cs rId - getQueue store SSender sId = do - cs <- readTVar store - let rId = M.lookup sId $ senders cs - return $ maybe (Left AUTH) (getRcpQueue cs) rId - getQueue _ SBroker _ = - return $ Left INTERNAL + getQueue st party qId = do + cs <- readTVar st + pure $ case party of + SRecipient -> getRcpQueue cs qId + SSender -> getPartyQueue cs senders + SNotifier -> getPartyQueue cs notifiers + SBroker -> Left INTERNAL + where + getPartyQueue :: + QueueStoreData -> + (QueueStoreData -> Map QueueId RecipientId) -> + Either ErrorType QueueRec + getPartyQueue cs recipientIds = + case M.lookup qId $ recipientIds cs of + Just rId -> getRcpQueue cs rId + Nothing -> Left AUTH + secureQueue :: QueueStore -> RecipientId -> SenderPublicKey -> STM (Either ErrorType ()) secureQueue store rId sKey = updateQueues store rId $ \cs c -> case senderKey c of Just _ -> (Left AUTH, cs) _ -> (Right (), cs {queues = M.insert rId c {senderKey = Just sKey} (queues cs)}) + addQueueNotifier :: QueueStore -> RecipientId -> NotifierId -> NotifierPublicKey -> STM (Either ErrorType ()) + addQueueNotifier store rId nId nKey = do + cs@QueueStoreData {queues, notifiers} <- readTVar store + if M.member nId notifiers + then pure $ Left DUPLICATE_ + else case M.lookup rId queues of + Nothing -> pure $ Left AUTH + Just q -> case notifier q of + Just _ -> pure $ Left AUTH + _ -> do + writeTVar store $ + cs + { queues = M.insert rId q {notifier = Just (nId, nKey)} queues, + notifiers = M.insert nId rId notifiers + } + pure $ Right () + suspendQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ()) suspendQueue store rId = updateQueues store rId $ \cs c -> diff --git a/src/Simplex/Messaging/Server/StoreLog.hs b/src/Simplex/Messaging/Server/StoreLog.hs index 8dd468442..9f7fb5552 100644 --- a/src/Simplex/Messaging/Server/StoreLog.hs +++ b/src/Simplex/Messaging/Server/StoreLog.hs @@ -14,6 +14,7 @@ module Simplex.Messaging.Server.StoreLog closeStoreLog, logCreateQueue, logSecureQueue, + logAddNotifier, logDeleteQueue, readWriteStoreLog, ) @@ -50,36 +51,44 @@ data StoreLog (a :: IOMode) where data StoreLogRecord = CreateQueue QueueRec | SecureQueue QueueId SenderPublicKey + | AddNotifier QueueId NotifierId NotifierPublicKey | DeleteQueue QueueId storeLogRecordP :: Parser StoreLogRecord storeLogRecordP = "CREATE " *> createQueueP <|> "SECURE " *> secureQueueP + <|> "NOTIFIER " *> addNotifierP <|> "DELETE " *> (DeleteQueue <$> base64P) where createQueueP = CreateQueue <$> queueRecP secureQueueP = SecureQueue <$> base64P <* A.space <*> C.pubKeyP + addNotifierP = + AddNotifier <$> base64P <* A.space <*> base64P <* A.space <*> C.pubKeyP queueRecP = do recipientId <- "rid=" *> base64P <* A.space senderId <- "sid=" *> base64P <* A.space recipientKey <- "rk=" *> C.pubKeyP <* A.space senderKey <- "sk=" *> optional C.pubKeyP - pure QueueRec {recipientId, senderId, recipientKey, senderKey, status = QueueActive} + notifier <- optional $ (,) <$> (" nid=" *> base64P) <*> (" nk=" *> C.pubKeyP) + pure QueueRec {recipientId, senderId, recipientKey, senderKey, notifier, status = QueueActive} serializeStoreLogRecord :: StoreLogRecord -> ByteString serializeStoreLogRecord = \case CreateQueue q -> "CREATE " <> serializeQueue q SecureQueue rId sKey -> "SECURE " <> encode rId <> " " <> C.serializePubKey sKey + AddNotifier rId nId nKey -> B.unwords ["NOTIFIER", encode rId, encode nId, C.serializePubKey nKey] DeleteQueue rId -> "DELETE " <> encode rId where - serializeQueue QueueRec {recipientId, senderId, recipientKey, senderKey} = + serializeQueue QueueRec {recipientId, senderId, recipientKey, senderKey, notifier} = B.unwords [ "rid=" <> encode recipientId, "sid=" <> encode senderId, "rk=" <> C.serializePubKey recipientKey, "sk=" <> maybe "" C.serializePubKey senderKey ] + <> maybe "" serializeNotifier notifier + serializeNotifier (nId, nKey) = " nid=" <> encode nId <> " nk=" <> C.serializePubKey nKey openWriteStoreLog :: FilePath -> IO (StoreLog 'WriteMode) openWriteStoreLog f = WriteStoreLog f <$> openFile f WriteMode @@ -110,6 +119,9 @@ logCreateQueue s = writeStoreLogRecord s . CreateQueue logSecureQueue :: StoreLog 'WriteMode -> QueueId -> SenderPublicKey -> IO () logSecureQueue s qId sKey = writeStoreLogRecord s $ SecureQueue qId sKey +logAddNotifier :: StoreLog 'WriteMode -> QueueId -> NotifierId -> NotifierPublicKey -> IO () +logAddNotifier s qId nId nKey = writeStoreLogRecord s $ AddNotifier qId nId nKey + logDeleteQueue :: StoreLog 'WriteMode -> QueueId -> IO () logDeleteQueue s = writeStoreLogRecord s . DeleteQueue @@ -141,6 +153,7 @@ readQueues (ReadStoreLog _ h) = LB.hGetContents h >>= returnResult . procStoreLo procLogRecord m = \case CreateQueue q -> M.insert (recipientId q) q m SecureQueue qId sKey -> M.adjust (\q -> q {senderKey = Just sKey}) qId m + AddNotifier qId nId nKey -> M.adjust (\q -> q {notifier = Just (nId, nKey)}) qId m DeleteQueue qId -> M.delete qId m printError :: LogParsingError -> IO () printError (e, s) = B.putStrLn $ "Error parsing log: " <> B.pack e <> " - " <> s diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index 5bd05c4a9..d558a636a 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -1,4 +1,5 @@ {-# LANGUAGE InstanceSigs #-} +{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# OPTIONS_GHC -fno-warn-orphans #-} @@ -42,6 +43,9 @@ f <$?> m = m >>= either fail pure . f bshow :: Show a => a -> ByteString bshow = B.pack . show +maybeWord :: (a -> ByteString) -> Maybe a -> ByteString +maybeWord f = maybe "" $ B.cons ' ' . f + liftIOEither :: (MonadIO m, MonadError e m) => IO (Either e a) -> m a liftIOEither a = liftIO a >>= liftEither @@ -53,3 +57,9 @@ liftEitherError f a = liftIOEither (first f <$> a) tryError :: MonadError e m => m a -> m (Either e a) tryError action = (Right <$> action) `catchError` (pure . Left) + +ifM :: Monad m => m Bool -> m a -> m a -> m a +ifM ba t f = ba >>= \b -> if b then t else f + +unlessM :: Monad m => m Bool -> m () -> m () +unlessM b = ifM b $ pure () diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 58a5d5163..6892baaea 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -161,6 +161,12 @@ smpTest3 _ test' = smpTestN 3 _test _test [h1, h2, h3] = test' h1 h2 h3 _test _ = error "expected 3 handles" +smpTest4 :: Transport c => TProxy c -> (THandle c -> THandle c -> THandle c -> THandle c -> IO ()) -> Expectation +smpTest4 _ test' = smpTestN 4 _test + where + _test [h1, h2, h3, h4] = test' h1 h2 h3 h4 + _test _ = error "expected 4 handles" + tPutRaw :: Transport c => THandle c -> RawTransmission -> IO () tPutRaw h (sig, corrId, queueId, command) = do let t = B.intercalate " " [corrId, queueId, command] diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index a3d93093b..3d328a4af 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -39,6 +39,7 @@ serverTests t = do describe "switch subscription to another SMP queue" $ testSwitchSub t describe "Store log" $ testWithStoreLog t describe "Timing of AUTH error" $ testTiming t + describe "Message notifications" $ testMessageNotifications t pattern Resp :: CorrId -> QueueId -> Command 'Broker -> SignedTransmissionOrError pattern Resp corrId queueId command <- ("", (corrId, queueId, Right (Cmd SBroker command))) @@ -272,14 +273,20 @@ testWithStoreLog at@(ATransport t) = it "should store simplex queues to log and restore them after server restart" $ do (sPub1, sKey1) <- C.generateKeyPair rsaKeySize (sPub2, sKey2) <- C.generateKeyPair rsaKeySize + (nPub, nKey) <- C.generateKeyPair rsaKeySize senderId1 <- newTVarIO "" senderId2 <- newTVarIO "" + notifierId <- newTVarIO "" - withSmpServerStoreLogOn at testPort . runTest t $ \h -> do - (sId1, _, _) <- createAndSecureQueue h sPub1 + withSmpServerStoreLogOn at testPort . runTest t $ \h -> runClient t $ \h1 -> do + (sId1, rId, rKey) <- createAndSecureQueue h sPub1 atomically $ writeTVar senderId1 sId1 + Resp "abcd" _ (NID nId) <- signSendRecv h rKey ("abcd", rId, "NKEY " <> C.serializePubKey nPub) + atomically $ writeTVar notifierId nId + Resp "dabc" _ OK <- signSendRecv h1 nKey ("dabc", nId, "NSUB") Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, "SEND 5 hello ") Resp "" _ (MSG _ _ "hello") <- tGet fromServer h + Resp "" _ NMSG <- tGet fromServer h1 (sId2, rId2, rKey2) <- createAndSecureQueue h sPub2 atomically $ writeTVar senderId2 sId2 @@ -289,7 +296,7 @@ testWithStoreLog at@(ATransport t) = Resp "dabc" _ OK <- signSendRecv h rKey2 ("dabc", rId2, "DEL") pure () - logSize `shouldReturn` 5 + logSize `shouldReturn` 6 withSmpServerThreadOn at testPort . runTest t $ \h -> do sId1 <- readTVarIO senderId1 @@ -297,10 +304,12 @@ testWithStoreLog at@(ATransport t) = Resp "bcda" _ (ERR AUTH) <- signSendRecv h sKey1 ("bcda", sId1, "SEND 5 hello ") pure () - withSmpServerStoreLogOn at testPort . runTest t $ \h -> do + withSmpServerStoreLogOn at testPort . runTest t $ \h -> runClient t $ \h1 -> do -- this queue is restored sId1 <- readTVarIO senderId1 + nId <- readTVarIO notifierId Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, "SEND 5 hello ") + Resp "dabc" _ OK <- signSendRecv h1 nKey ("dabc", nId, "NSUB") -- this queue is removed - not restored sId2 <- readTVarIO senderId2 Resp "cdab" _ (ERR AUTH) <- signSendRecv h sKey2 ("cdab", sId2, "SEND 9 hello too ") @@ -309,26 +318,29 @@ testWithStoreLog at@(ATransport t) = logSize `shouldReturn` 1 removeFile testStoreLogFile where - createAndSecureQueue :: Transport c => THandle c -> SenderPublicKey -> IO (SenderId, RecipientId, C.SafePrivateKey) - createAndSecureQueue h sPub = do - (rPub, rKey) <- C.generateKeyPair rsaKeySize - Resp "abcd" "" (IDS rId sId) <- signSendRecv h rKey ("abcd", "", "NEW " <> C.serializePubKey rPub) - let keyCmd = "KEY " <> C.serializePubKey sPub - Resp "dabc" rId' OK <- signSendRecv h rKey ("dabc", rId, keyCmd) - (rId', rId) #== "same queue ID" - pure (sId, rId, rKey) - runTest :: Transport c => TProxy c -> (THandle c -> IO ()) -> ThreadId -> Expectation runTest _ test' server = do testSMPClient test' `shouldReturn` () killThread server + runClient :: Transport c => TProxy c -> (THandle c -> IO ()) -> Expectation + runClient _ test' = testSMPClient test' `shouldReturn` () + logSize :: IO Int logSize = try (length . B.lines <$> B.readFile testStoreLogFile) >>= \case Right l -> pure l Left (_ :: SomeException) -> logSize +createAndSecureQueue :: Transport c => THandle c -> SenderPublicKey -> IO (SenderId, RecipientId, C.SafePrivateKey) +createAndSecureQueue h sPub = do + (rPub, rKey) <- C.generateKeyPair rsaKeySize + Resp "abcd" "" (IDS rId sId) <- signSendRecv h rKey ("abcd", "", "NEW " <> C.serializePubKey rPub) + let keyCmd = "KEY " <> C.serializePubKey sPub + Resp "dabc" rId' OK <- signSendRecv h rKey ("dabc", rId, keyCmd) + (rId', rId) #== "same queue ID" + pure (sId, rId, rKey) + testTiming :: ATransport -> Spec testTiming (ATransport t) = it "should have similar time for auth error, whether queue exists or not, for all key sizes" $ @@ -375,6 +387,28 @@ testTiming (ATransport t) = Resp "" _ (MSG _ _ "hello") <- tGet fromServer rh similarTime timeNoQueue timeWrongKey +testMessageNotifications :: ATransport -> Spec +testMessageNotifications (ATransport t) = + it "should create simplex connection, subscribe notifier and deliver notifications" $ do + (sPub, sKey) <- C.generateKeyPair rsaKeySize + (nPub, nKey) <- C.generateKeyPair rsaKeySize + smpTest4 t $ \rh sh nh1 nh2 -> do + (sId, rId, rKey) <- createAndSecureQueue rh sPub + Resp "1" _ (NID nId) <- signSendRecv rh rKey ("1", rId, "NKEY " <> C.serializePubKey nPub) + Resp "2" _ OK <- signSendRecv nh1 nKey ("2", nId, "NSUB") + Resp "3" _ OK <- signSendRecv sh sKey ("3", sId, "SEND 5 hello ") + Resp "" _ (MSG _ _ "hello") <- tGet fromServer rh + Resp "3a" _ OK <- signSendRecv rh rKey ("3a", rId, "ACK") + Resp "" _ NMSG <- tGet fromServer nh1 + Resp "4" _ OK <- signSendRecv nh2 nKey ("4", nId, "NSUB") + Resp "" _ END <- tGet fromServer nh1 + Resp "5" _ OK <- signSendRecv sh sKey ("5", sId, "SEND 11 hello again ") + Resp "" _ (MSG _ _ "hello again") <- tGet fromServer rh + Resp "" _ NMSG <- tGet fromServer nh2 + 1000 `timeout` tGet fromServer nh1 >>= \case + Nothing -> return () + Just _ -> error "nothing else should be delivered to the 1st notifier's TCP connection" + samplePubKey :: ByteString samplePubKey = "rsa:MIIBoDANBgkqhkiG9w0BAQEFAAOCAY0AMIIBiAKCAQEAtn1NI2tPoOGSGfad0aUg0tJ0kG2nzrIPGLiz8wb3dQSJC9xkRHyzHhEE8Kmy2cM4q7rNZIlLcm4M7oXOTe7SC4x59bLQG9bteZPKqXu9wk41hNamV25PWQ4zIcIRmZKETVGbwN7jFMpH7wxLdI1zzMArAPKXCDCJ5ctWh4OWDI6OR6AcCtEj+toCI6N6pjxxn5VigJtwiKhxYpoUJSdNM60wVEDCSUrZYBAuDH8pOxPfP+Tm4sokaFDTIG3QJFzOjC+/9nW4MUjAOFll9PCp9kaEFHJ/YmOYKMWNOCCPvLS6lxA83i0UaardkNLNoFS5paWfTlroxRwOC2T6PwO2ywKBgDjtXcSED61zK1seocQMyGRINnlWdhceD669kIHju/f6kAayvYKW3/lbJNXCmyinAccBosO08/0sUxvtuniIo18kfYJE0UmP1ReCjhMP+O+yOmwZJini/QelJk/Pez8IIDDWnY1qYQsN/q7ocjakOYrpGG7mig6JMFpDJtD6istR"