agent: delivery receipts (#752)

* rfc: delivery receipts

* update doc

* update rfc

* implementation plan, types, schema

* migration, update types

* update types

* rename migration

* export MsgReceiptStatus, JSON encoding

* update rfc, schema

* correction

Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>

* skeleton of the implementation

* more implementation (some tests fail)

* more code, 1 test fails

* fix encoding

* refactor

* refactor

* test, fix

* only send receipts in v3+, test

* flip condition

Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>

* flip condition

Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>

* agent version 4 required to send receipts

* fix test

---------

Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
This commit is contained in:
Evgeny Poberezkin
2023-07-13 22:33:48 +01:00
committed by GitHub
parent 745a144e0c
commit 58cb2855d2
14 changed files with 649 additions and 167 deletions
+170
View File
@@ -0,0 +1,170 @@
# Delivery receipts
## Problems
User experience - users need to know that the messages are delivered to the recipient, as this confirms that the system is functioning.
The downside of communicating message delivery as it confirms that the recipient was online, and, unless there is a delay in confirming, can be used to track the location via the variation in network latency. So delivery receipts should be delayed with a randomized interval and should be opt in or opt out.
Another problem of message receipts is that they increase network traffic and server load. This could be avoided if delivery receipts are communicated as part of normal message delivery flow.
Some other existing and planned features implicitely confirm message delivery and, possibly, should depend on message delivery being enabled:
- agent message to resume delivery when quota was exceeded (implemented, [rfc](./2022-12-27-queue-quota.md))
- agent message to re-deliver skipped messages or to re-negotiate double ratchet.
## Solution
There are three layers where delivery receipts can be implemented:
- chat protocol. Pro: logic of when to deliver it is decoupled from the message flow, Con: extra traffic, can only work in duplex connections.
- agent client protocol. Pro: can be automated and combined with the protocol to re-deliver skipped messages. Con: extra traffic.
- SMP protocol. Pro: minimal extra traffic, Con: complicates server design as it would require pushing receipts when there is no next message.
The last approach seems the most promising for avoiding additional traffic:
- modify client ACK command to include whether delivery receipt should be provided to sender, and, possibly, any e2e encrypted data that should be included in the receipt (e.g., that the receiving client already saw this message in case we use "feedback" variant of roumor-mongering protocol for groups).
- server would manage delaying of the receipts, by randomizing the time after which the receipt will be available to the sender, and by combining the receipts when possible.
- modify response to SEND command to include any available delivery receipts.
- add a separate delivery receipt that will be pushed to the sender in the connection where the message was received by the server.
## SMP protocol changes
```haskell
data Command (p :: Party) where
-- ...
ACK :: MsgId -> Maybe ByteString -> Command Recipient
-- the presense of ByteString in ACK indicates that the delivery needs to be confirmed.
-- the protocol does not define the format of this confirmation, it is application specific, and can be -- an empty string.
-- And open question is how to e2e encrypt information in this string - this probably can be handled on Agent client protocol level, and could be the same ratchet key that was used to encrypt and decrypt the message. The downside of this approach is that this key currently is not stored, and storing it requires additional logic to clear these keys if unused after some time.
-- TODO consider what could be a better approach.
SENT :: MsgId -> [(MsgId, UTCTime, ByteString)] -> Command Sender
-- or
-- SENT :: MsgId -> Command Sender
-- in case we just batch
-- this response will be sent to SEND command and will include a sender's message ID generated by the server (currently it does not exist), and posibly an empty list of delivery receipts with the same message IDs as in responses to SEND, timestamps when these receipts became available, and e2e encrypted ByteString passed in ACK command.
-- The ID in this response should be different from the ID used in MSG, to keep the promise of not having shared identifiers in sent/received traffic even inside TLS tunnel.
-- Keeping the quality of shared ciphertext also requires adding additional encryption layer between the server and the sender, this can be achieved in one of two ways:
-- 1) passing a separate DH key in each SEND command, and server including additional DH key in each SENT response, with computed DH secret per message later used to encrypt and decrypt the delivery receipt payloads. This is probably a bad idea as it would increase a cryptographic load on both the server and the client.
-- 2) agree a key per queue, in the same way it is done for the recipient. Possibly, it requires additional DH key in confirmation message that the recipient then uses to secure the queue, and passing this key in KEY (secure queue) command. The response to this secure command would the include server's DH key returned to the recipient that would be passed to the sender in HELLO message. Even though recipient could observe both public DH keys, they won't know the computed shared secret. Recipient that controls the server could perform MITM attack on this key exchange, but it doesn't give any benefit over what recipient can do when they have access to the server - the threat model remains the same. The downside of this approach is that it also requires additional changes in client protocol level (confirmation message format and HELLO message).
-- 3) also agree on a key per queue, but via separate commands between the sender and the server, once the sender was notified that the queue is secured. This approach is probably better, and the server would simply delay the delivery of delivery receipts until the shared secret is agreed.
SKEY :: C.PublicKeyX25519 -> Command Sender
SBKEY :: C.PublicKeyX25519 -> BrokerMsg
-- these are the command and response to agree secret to encrypt delivery receipt payloads for option 3
SSUB :: Command Sender
-- subscribe to receive delivery receipts for a given queue - will be sent when the conversation is opened (unless there is an active subscription already), not all queues at once, and won't be re-subscribed on losing the server connection (TBC).
RCVD :: MsgId -> UTCTime -> ByteString -> Command Recipient
-- delivery receipt. UTCTime is the time when it became available, not the time when ACK was sent by the recipient, to avoid leaking location via network latency.
```
Possibly, there is no need to include delivery receipts into SENT response and instead just use batching of responses that is already supported. As server responses are not signed, there is no per-response overhead that is substantial, and a lot of receipts that are available can be packed into one block (depending on the size of payload that has to be fixed not to leak metadata).
This all seems rather complicated for SMP protocol, and the approach of doing it on a higher level seems more attractive than initially. Possibly we should reconsider, and reduce traffic by reducing block sizes... Reducing block sizes unfortunately requires supporting variable block sizes, and would leak some metadata during the transition period.
## Another approach
Above represents substantial complexity, and at least doubles server code complexity for the feature that is definitely not doubling the value of server software. Moving to variable block size is simpler, but also has a lot of complexity, reduces metadata privacy (at least for the duration of migration period), reduces image preview quality, and requires postponing this feature for multiple releases, until all clients migrate.
Given that the main traffic is generated by the groups, and direct messages do not create a lot of traffic, a much simpler and better solution is to simply send delivery receipts as the message, in direct conversations only, either as chat protocol message or as agent client protocol message (either on the message or on the envelope layer).
### Comparison of these two approaches:
**Chat protocol message**
Pros:
- simpler, more contained change - SMP layer is not aware of this feature
- easier to extend protocol with additional application specific payload, e.g. references to group DAG
Cons:
- ?
```json
// ...
"x.msg.delivered": {
"properties": {
"msgId": {"ref": "base64url"},
"params": {
"properties": {
"msgId": {"ref": "base64url"},
},
"optionalProperties": {
"data": {} // possibly the initial protocol does not need it, with JSON can be added later
}
}
}
},
// ...
```
**Agent client protocol envelope**
Pros:
- possibility of using it in a wider range of the applications
- possibility to include received message hash to increase communication integrity - the sending client would be then notified, and it can be exposed in the UI, that the received message is not the same as sent.
Cons:
- additional implementation complexity - requires additional events to communicate between chat and agent.
```haskell
data AMessage =
-- ...
| A_RCVD AgentMsgId MsgHash ByteString -- references to the received message
-- ...
```
The weirdness of the above design is that it refers to the data present in the header of another message, the alternative would be to have a separate envelope for delivery receipt:
```haskell
data AgentMessage =
-- ...
AgentMessageRcvd APrivHeader AgentMsgId MsgHash ByteString -- references to the received message
-- ...
```
But probably the first one is a bit better, TBC.
In any case there should be an additional event to notify chat client:
```haskell
data ACommand (p :: AParty) (e :: AEntity) where
-- ...
RCVD AgentMsgId MsgMeta ByteString -> ACommand Agent AEConn
-- ...
```
On the balance of things, implementing on the level of Agent Client protocol seems better, as the additional complexity is marginal, but it allows for wider range of applications, and also allows for additional delivery integrity validation. The format for payload still requires chat protocol message encoding once we want to add it, but initially it could be just an empty string.
## Implementation plan
Currently, we delete sent messages once delivered to the server. It would be helpful if we could keep the records in snd_messages table, and then use them to process delivery receipts, although it may be insufficient (we could add fields). Probably it is not possible to keep them as there is a foreign key constraint with `on delete cascade`.
The new table will be used to track sent message hashes to correlate their IDs with delivery receipts (that will also be stored in messages/rcv_messages). Receipts need to be processed in chat in the same way as normal messages, so they would be sent to chat with MsgMeta, and the chat will need to ack them once processed.
Agent ackMessage function will be also used to automatically schedule sending delivery receipts if they are enabled for connection, only for normal messages - no sending receipts to receipts.
There can be receipt re-delivery to the chat in the same cases as when normal message can be re-delivered (in case of AGENT A_DUPLICATE when it was not ack'd by the user).
## Other considerations
### How clients decide whether to send delivery receipts
Two options are possible - local settings per conversation or chat preferences framework, that allows to have mutual on/off. The latter seems preferable as without knowing whether the other party is sending receipts, it is not possible to uderstand what the absense of the receipt means - network malfunction or receipts disabled.
Groups are a special case, as while some groups may enable sending the delivery receipts, the group members should be able to disable it locally. This should probably be done via a separate conversation setting in the same way as enabling notifications or favourites. In this case the receipt would only be sent to the group if it is enabled in the group and not disabled by the member. The toggle may be located in the same page as chat preferences, but it should be a separate setting. We might want though to communicate somehow whether a given member sends delivery receipts so that other members know whether to expect them or not.
### How this functionality is released
5.2:
- support for sending and receiving delivery receipts preference, both in direct messages and in groups (for forward compatibility).
- support for sending and receiving delivery receipts in direct chats only, but disable sending them
5.3:
- show receipt preferences in the UI
- enable sending receipts in direct chats where they are enabled
A separate question is how to enable this functionality for the existing contacts. Possible options are:
1. Enable (as per default) for all contacts, show notification to the user when they open the app for update that delivery notifications are now sent by default to all contacts. Pro: no extra logic to implement. Con: may be perceived as a privacy violation, as to some contacts the delivery receipts will be sent before the user had chance to disable them.
2. Ask the user when the new version first runs whether they want to use delivery receipts and offer these options:
1) keep enabled for all profiles (and for all contacts)
2) enable for all profiles in ~12 or in ~24 hours giving users the chance to review all contacts / profiles and disable some of them. The problem here is also in possibility of the correlation in case they all start sending receipts at the same time. Possibly the option could be to set a random time in 12-15 or 24-30 hours range to avoid the possibility of such correlation.
3) disable for all profiles it will require sending profile updates to all contacts, so the delivery receipts should be kept disabled and profile updates should be sent after random intervals, one by one (not scheduled all at once, as the time may pass while the app is off).
3. Offer an option to enable globally later - we could keep it as a one-off option, available only to existing users, and visible on the top level of the Settings - once enabled, the option will disappear and it won't be possible to disable again. The downside here is that the new contacts would be receiving the profile with enabled notifications but they still won't be delivered...
4. Another option is to have all new contacts decided based on a global user default (that can be set in the settings and in the dialog on first start), but for the existing contacts keep in unset state that is not interpreted as either on or off, but interpreted as unknown until the user makes a choice... That might be an optimal solution for the users but it would probably require changing the preferences framework or some ad-hoc hacks. That still keeps the question open how to avoid correlation between profiles.
5. That might be the case for version agreement too - the availability of the option per contact will depend on the version. It doesn't answer the question what to do with global defaults though...
+1 -1
View File
@@ -34,7 +34,6 @@ flag swift
library
exposed-modules:
Simplex.FileTransfer
Simplex.FileTransfer.Agent
Simplex.FileTransfer.Client
Simplex.FileTransfer.Client.Agent
@@ -85,6 +84,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230516_encrypted_rcv_message_hashes
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230531_switch_status
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230615_ratchet_sync
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230701_delivery_receipts
Simplex.Messaging.Agent.TAsyncs
Simplex.Messaging.Agent.TRcvQueues
Simplex.Messaging.Client
-14
View File
@@ -1,14 +0,0 @@
module Simplex.FileTransfer where
-- TODO
-- Protocol
-- Store (in memory storage)
-- StoreLog (append only log)
-- FileDescription
-- Server
-- Client
-- Server/Main (server CLI)
-- Client/Main (client CLI)
--
-- Transport for HTTP2 ?
-- streaming Crypto
+84 -39
View File
@@ -117,7 +117,7 @@ import Data.List.NonEmpty (NonEmpty (..))
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (fromMaybe, isJust, isNothing)
import Data.Maybe (fromMaybe, isJust, isNothing, catMaybes)
import Data.Text (Text)
import qualified Data.Text as T
import Data.Time.Clock
@@ -152,7 +152,6 @@ import Simplex.Messaging.Util
import Simplex.Messaging.Version
import UnliftIO.Async (async, race_)
import UnliftIO.Concurrent (forkFinally, forkIO, threadDelay)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
-- import GHC.Conc (unsafeIOToSTM)
@@ -203,8 +202,8 @@ acceptContactAsync :: AgentErrorMonad m => AgentClient -> ACorrId -> Bool -> Con
acceptContactAsync c corrId enableNtfs = withAgentEnv c .: acceptContactAsync' c corrId enableNtfs
-- | Acknowledge message (ACK command) asynchronously, no synchronous response
ackMessageAsync :: forall m. AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> m ()
ackMessageAsync c = withAgentEnv c .:. ackMessageAsync' c
ackMessageAsync :: forall m. AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m ()
ackMessageAsync c = withAgentEnv c .:: ackMessageAsync' c
-- | Switch connection to the new receive queue
switchConnectionAsync :: AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> m ConnectionStats
@@ -264,8 +263,8 @@ resubscribeConnections c = withAgentEnv c . resubscribeConnections' c
sendMessage :: AgentErrorMonad m => AgentClient -> ConnId -> MsgFlags -> MsgBody -> m AgentMsgId
sendMessage c = withAgentEnv c .:. sendMessage' c
ackMessage :: AgentErrorMonad m => AgentClient -> ConnId -> AgentMsgId -> m ()
ackMessage c = withAgentEnv c .: ackMessage' c
ackMessage :: AgentErrorMonad m => AgentClient -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m ()
ackMessage c = withAgentEnv c .:. ackMessage' c
-- | Switch connection to the new receive queue
switchConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m ConnectionStats
@@ -432,7 +431,7 @@ processCommand c (connId, APC e cmd) =
RJCT invId -> rejectContact' c connId invId $> (connId, OK)
SUB -> subscribeConnection' c connId $> (connId, OK)
SEND msgFlags msgBody -> (connId,) . MID <$> sendMessage' c connId msgFlags msgBody
ACK msgId -> ackMessage' c connId msgId $> (connId, OK)
ACK msgId rcptInfo_ -> ackMessage' c connId msgId rcptInfo_ $> (connId, OK)
SWCH -> switchConnection' c connId $> (connId, OK)
OFF -> suspendConnection' c connId $> (connId, OK)
DEL -> deleteConnection' c connId $> (connId, OK)
@@ -507,8 +506,8 @@ acceptContactAsync' c corrId enableNtfs invId ownConnInfo = do
throwError err
_ -> throwError $ CMD PROHIBITED
ackMessageAsync' :: forall m. AgentMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> m ()
ackMessageAsync' c corrId connId msgId = do
ackMessageAsync' :: forall m. AgentMonad m => AgentClient -> ACorrId -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m ()
ackMessageAsync' c corrId connId msgId rcptInfo_ = do
SomeConn cType _ <- withStore c (`getConn` connId)
case cType of
SCDuplex -> enqueueAck
@@ -519,8 +518,11 @@ ackMessageAsync' c corrId connId msgId = do
where
enqueueAck :: m ()
enqueueAck = do
(RcvQueue {server}, _) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId
enqueueCommand c corrId connId (Just server) . AClientCommand $ APC SAEConn $ ACK msgId
let mId = InternalId msgId
RcvMsg {msgType} <- withStore c $ \db -> getRcvMsg db connId mId
when (isJust rcptInfo_ && msgType /= AM_A_MSG_) $ throwError $ CMD PROHIBITED
(RcvQueue {server}, _) <- withStore c $ \db -> setMsgUserAck db connId mId
enqueueCommand c corrId connId (Just server) . AClientCommand $ APC SAEConn $ ACK msgId rcptInfo_
deleteConnectionAsync' :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
deleteConnectionAsync' c connId = deleteConnectionsAsync' c [connId]
@@ -891,7 +893,7 @@ runCommandProcessing c@AgentClient {subQ} server_ = do
void $ joinConnSrv c userId connId True enableNtfs cReq connInfo srv
notify OK
LET confId ownCInfo -> withServer' . tryCommand $ allowConnection' c connId confId ownCInfo >> notify OK
ACK msgId -> withServer' . tryCommand $ ackMessage' c connId msgId >> notify OK
ACK msgId rcptInfo_ -> withServer' . tryCommand $ ackMessage' c connId msgId rcptInfo_ >> notify OK
SWCH ->
noServer . tryCommand . withConnLock c connId "switchConnection" $
withStore c (`getConn` connId) >>= \case
@@ -1112,6 +1114,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
_ -> connError msgId NOT_ACCEPTED
AM_REPLY_ -> notifyDel msgId err
AM_A_MSG_ -> notifyDel msgId err
AM_A_RCVD_ -> notifyDel msgId err
AM_QCONT_ -> notifyDel msgId err
AM_QADD_ -> qError msgId "QADD: AUTH"
AM_QKEY_ -> qError msgId "QKEY: AUTH"
@@ -1166,6 +1169,7 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
qInfo <- createReplyQueue c cData sq srv
void . enqueueMessage c cData sq SMP.noMsgFlags $ REPLY [qInfo]
AM_A_MSG_ -> notify $ SENT mId
AM_A_RCVD_ -> pure ()
AM_QCONT_ -> pure ()
AM_QADD_ -> pure ()
AM_QKEY_ -> do
@@ -1200,10 +1204,12 @@ runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, dupl
_ -> internalErr msgId "sent QTEST: queue not in connection or not replacing another queue"
_ -> internalErr msgId "QTEST sent not in duplex connection"
AM_EREADY_ -> pure ()
delMsg msgId
delMsgKeep (msgType == AM_A_MSG_) msgId
where
delMsg :: InternalId -> m ()
delMsg msgId = withStore' c $ \db -> deleteSndMsgDelivery db connId sq msgId
delMsg = delMsgKeep False
delMsgKeep :: Bool -> InternalId -> m ()
delMsgKeep keepForReceipt msgId = withStore' c $ \db -> deleteSndMsgDelivery db connId sq msgId keepForReceipt
notify :: forall e. AEntityI e => ACommand 'Agent e -> m ()
notify cmd = atomically $ writeTBQueue subQ ("", connId, APC (sAEntity @e) cmd)
notifyDel :: AEntityI e => InternalId -> ACommand 'Agent e -> m ()
@@ -1220,22 +1226,38 @@ retrySndOp c loop = do
atomically $ beginAgentOperation c AOSndNetwork
loop
ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> m ()
ackMessage' c connId msgId = withConnLock c connId "ackMessage" $ do
ackMessage' :: forall m. AgentMonad m => AgentClient -> ConnId -> AgentMsgId -> Maybe MsgReceiptInfo -> m ()
ackMessage' c connId msgId rcptInfo_ = withConnLock c connId "ackMessage" $ do
SomeConn _ conn <- withStore c (`getConn` connId)
case conn of
DuplexConnection {} -> ack
RcvConnection {} -> ack
DuplexConnection {} -> ack >> sendRcpt conn >> del
RcvConnection {} -> ack >> del
SndConnection {} -> throwError $ CONN SIMPLEX
ContactConnection {} -> throwError $ CMD PROHIBITED
NewConnection _ -> throwError $ CMD PROHIBITED
where
ack :: m ()
ack = do
let mId = InternalId msgId
(rq, srvMsgId) <- withStore c $ \db -> setMsgUserAck db connId mId
-- the stored message was delivered via a specific queue, the rest failed to decrypt and were already acknowledged
(rq, srvMsgId) <- withStore c $ \db -> setMsgUserAck db connId $ InternalId msgId
ackQueueMessage c rq srvMsgId
withStore' c $ \db -> deleteMsg db connId mId
del :: m ()
del = withStore' c $ \db -> deleteMsg db connId $ InternalId msgId
sendRcpt :: Connection 'CDuplex -> m ()
sendRcpt (DuplexConnection cData _ sqs) = do
msg@RcvMsg {msgType, msgReceipt} <- withStore c $ \db -> getRcvMsg db connId $ InternalId msgId
case rcptInfo_ of
Just rcptInfo -> do
unless (msgType == AM_A_MSG_) $ throwError (CMD PROHIBITED)
when (messageRcptsSupported cData) $ do
let RcvMsg {msgMeta = MsgMeta {sndMsgId}, internalHash} = msg
rcpt = A_RCVD [AMessageReceipt {agentMsgId = sndMsgId, msgHash = internalHash, rcptInfo}]
void $ enqueueMessages c cData sqs SMP.MsgFlags {notification = False} rcpt
Nothing -> case (msgType, msgReceipt) of
-- only remove sent message if receipt hash was Ok, both to debug and for future redundancy
(AM_A_RCVD_, Just MsgReceipt {agentMsgId = sndMsgId, msgRcptStatus = MROk}) ->
withStore' c $ \db -> deleteDeliveredSndMsg db connId $ InternalId sndMsgId
_ -> pure ()
switchConnection' :: AgentMonad m => AgentClient -> ConnId -> m ConnectionStats
switchConnection' c connId =
@@ -1725,30 +1747,30 @@ cleanupManager c@AgentClient {subQ} = do
delay <- asks (initialCleanupDelay . config)
liftIO $ threadDelay' delay
int <- asks (cleanupInterval . config)
ttl <- asks $ storedMsgDataTTL . config
forever $ do
void . runExceptT $ do
deleteConns `catchAgentError` (notify "" . ERR)
deleteRcvMsgHashes `catchAgentError` (notify "" . ERR)
deleteProcessedRatchetKeyHashes `catchAgentError` (notify "" . ERR)
deleteRcvFilesExpired `catchAgentError` (notify "" . RFERR)
deleteRcvFilesDeleted `catchAgentError` (notify "" . RFERR)
deleteRcvFilesTmpPaths `catchAgentError` (notify "" . RFERR)
deleteSndFilesExpired `catchAgentError` (notify "" . SFERR)
deleteSndFilesDeleted `catchAgentError` (notify "" . SFERR)
deleteSndFilesPrefixPaths `catchAgentError` (notify "" . SFERR)
deleteExpiredReplicasForDeletion `catchAgentError` (notify "" . SFERR)
run ERR deleteConns
run ERR $ withStore' c (`deleteRcvMsgHashesExpired` ttl)
run ERR $ withStore' c (`deleteSndMsgsExpired` ttl)
run ERR $ withStore' c (`deleteRatchetKeyHashesExpired` ttl)
run RFERR deleteRcvFilesExpired
run RFERR deleteRcvFilesDeleted
run RFERR deleteRcvFilesTmpPaths
run SFERR deleteSndFilesExpired
run SFERR deleteSndFilesDeleted
run SFERR deleteSndFilesPrefixPaths
run SFERR deleteExpiredReplicasForDeletion
liftIO $ threadDelay' int
where
run :: forall e. AEntityI e => (AgentErrorType -> ACommand 'Agent e) -> ExceptT AgentErrorType m () -> m ()
run err a = do
void . runExceptT $ a `catchAgentError` (notify "" . err)
step <- asks $ cleanupStepInterval . config
liftIO $ threadDelay step
deleteConns =
withLock (deleteLock c) "cleanupManager" $ do
void $ withStore' c getDeletedConnIds >>= deleteDeletedConns c
withStore' c deleteUsersWithoutConns >>= mapM_ (notify "" . DEL_USER)
deleteRcvMsgHashes = do
rcvMsgHashesTTL <- asks $ rcvMsgHashesTTL . config
withStore' c (`deleteRcvMsgHashesExpired` rcvMsgHashesTTL)
deleteProcessedRatchetKeyHashes = do
rkHashesTTL <- asks $ processedRatchetKeyHashesTTL . config
withStore' c (`deleteProcessedRatchetKeyHashesExpired` rkHashesTTL)
deleteRcvFilesExpired = do
rcvFilesTTL <- asks $ rcvFilesTTL . config
rcvExpired <- withStore' c (`getRcvFilesExpired` rcvFilesTTL)
@@ -1855,6 +1877,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s
A_MSG body -> do
logServer "<--" c srv rId "MSG <MSG>"
notify $ MSG msgMeta msgFlags body
A_RCVD rcpts -> qDuplex conn'' "RCVD" $ messagesRcvd rcpts msgMeta
QCONT addr -> qDuplexAckDel conn'' "QCONT" $ continueSending addr
QADD qs -> qDuplexAckDel conn'' "QADD" $ qAddMsg qs
QKEY qs -> qDuplexAckDel conn'' "QKEY" $ qKeyMsg qs
@@ -2078,6 +2101,28 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s
void $ tryPutTMVar qLock ()
Nothing -> qError "QCONT: queue address not found"
messagesRcvd :: NonEmpty AMessageReceipt -> MsgMeta -> Connection 'CDuplex -> m ()
messagesRcvd rcpts msgMeta@MsgMeta {broker = (srvMsgId, _)} _ = do
logServer "<--" c srv rId "MSG <RCPT>"
rs <- forM rcpts $ \rcpt -> clientReceipt rcpt `catchAgentError` \e -> notify (ERR e) $> Nothing
case L.nonEmpty . catMaybes $ L.toList rs of
Just rs' -> notify $ RCVD msgMeta rs' -- client must ACK once processed
Nothing -> enqueueCmd $ ICAck rId srvMsgId
where
clientReceipt :: AMessageReceipt -> m (Maybe MsgReceipt)
clientReceipt AMessageReceipt {agentMsgId, msgHash} = do
let sndMsgId = InternalSndId agentMsgId
SndMsg {internalId = InternalId msgId, msgType, internalHash, msgReceipt} <- withStore c $ \db -> getSndMsgViaRcpt db connId sndMsgId
if msgType /= AM_A_MSG_
then notify (ERR $ AGENT A_PROHIBITED) $> Nothing -- unexpected message type for receipt
else case msgReceipt of
Just MsgReceipt {msgRcptStatus = MROk} -> pure Nothing -- already notified with MROk status
_ -> do
let msgRcptStatus = if msgHash == internalHash then MROk else MRBadMsgHash
rcpt = MsgReceipt {agentMsgId = msgId, msgRcptStatus}
withStore' c $ \db -> updateSndMsgRcpt db connId sndMsgId rcpt
pure $ Just rcpt
-- processed by queue sender
qAddMsg :: NonEmpty (SMPQueueUri, Maybe SndQAddr) -> Connection 'CDuplex -> m ()
qAddMsg ((_, Nothing) :| _) _ = qError "adding queue without switching is not supported"
@@ -2195,7 +2240,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s
rkHash k1 k2 = C.sha256Hash $ C.pubKeyBytes k1 <> C.pubKeyBytes k2
ratchetExists :: m Bool
ratchetExists = withStore' c $ \db -> do
exists <- checkProcessedRatchetKeyHashExists db connId rkHashRcv
exists <- checkRatchetKeyHashExists db connId rkHashRcv
unless exists $ addProcessedRatchetKeyHash db connId rkHashRcv
pure exists
getSendRatchetKeys :: m (C.PrivateKeyX448, C.PrivateKeyX448, C.PublicKeyX448, C.PublicKeyX448)
+4 -4
View File
@@ -87,8 +87,8 @@ data AgentConfig = AgentConfig
helloTimeout :: NominalDiffTime,
initialCleanupDelay :: Int64,
cleanupInterval :: Int64,
rcvMsgHashesTTL :: NominalDiffTime,
processedRatchetKeyHashesTTL :: NominalDiffTime,
cleanupStepInterval :: Int,
storedMsgDataTTL :: NominalDiffTime,
rcvFilesTTL :: NominalDiffTime,
sndFilesTTL :: NominalDiffTime,
xftpNotifyErrsOnRetry :: Bool,
@@ -152,8 +152,8 @@ defaultAgentConfig =
helloTimeout = 2 * nominalDay,
initialCleanupDelay = 30 * 1000000, -- 30 seconds
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
rcvMsgHashesTTL = 30 * nominalDay,
processedRatchetKeyHashesTTL = 30 * nominalDay,
cleanupStepInterval = 200000, -- 200ms
storedMsgDataTTL = 21 * nominalDay,
rcvFilesTTL = 2 * nominalDay,
sndFilesTTL = nominalDay,
xftpNotifyErrsOnRetry = True,
+109 -35
View File
@@ -69,6 +69,10 @@ module Simplex.Messaging.Agent.Protocol
AgentMessageType (..),
APrivHeader (..),
AMessage (..),
AMessageReceipt (..),
MsgReceipt (..),
MsgReceiptInfo,
MsgReceiptStatus (..),
SndQAddr,
SMPServer,
pattern SMPServer,
@@ -211,7 +215,7 @@ import Text.Read
import UnliftIO.Exception (Exception)
currentSMPAgentVersion :: Version
currentSMPAgentVersion = 3
currentSMPAgentVersion = 4
supportedSMPAgentVRange :: VersionRange
supportedSMPAgentVRange = mkVersionRange 1 currentSMPAgentVersion
@@ -314,7 +318,7 @@ data ACommand (p :: AParty) (e :: AEntity) where
JOIN :: Bool -> AConnectionRequestUri -> ConnInfo -> ACommand Client AEConn -- response OK
CONF :: ConfirmationId -> [SMPServer] -> ConnInfo -> ACommand Agent AEConn -- ConnInfo is from sender, [SMPServer] will be empty only in v1 handshake
LET :: ConfirmationId -> ConnInfo -> ACommand Client AEConn -- ConnInfo is from client
REQ :: InvitationId -> L.NonEmpty SMPServer -> ConnInfo -> ACommand Agent AEConn -- ConnInfo is from sender
REQ :: InvitationId -> NonEmpty SMPServer -> ConnInfo -> ACommand Agent AEConn -- ConnInfo is from sender
ACPT :: InvitationId -> ConnInfo -> ACommand Client AEConn -- ConnInfo is from client
RJCT :: InvitationId -> ACommand Client AEConn
INFO :: ConnInfo -> ACommand Agent AEConn
@@ -332,7 +336,8 @@ data ACommand (p :: AParty) (e :: AEntity) where
SENT :: AgentMsgId -> ACommand Agent AEConn
MERR :: AgentMsgId -> AgentErrorType -> ACommand Agent AEConn
MSG :: MsgMeta -> MsgFlags -> MsgBody -> ACommand Agent AEConn
ACK :: AgentMsgId -> ACommand Client AEConn
ACK :: AgentMsgId -> Maybe MsgReceiptInfo -> ACommand Client AEConn
RCVD :: MsgMeta -> NonEmpty MsgReceipt -> ACommand Agent AEConn
SWCH :: ACommand Client AEConn
OFF :: ACommand Client AEConn
DEL :: ACommand Client AEConn
@@ -392,6 +397,7 @@ data ACommandTag (p :: AParty) (e :: AEntity) where
MERR_ :: ACommandTag Agent AEConn
MSG_ :: ACommandTag Agent AEConn
ACK_ :: ACommandTag Client AEConn
RCVD_ :: ACommandTag Agent AEConn
SWCH_ :: ACommandTag Client AEConn
OFF_ :: ACommandTag Client AEConn
DEL_ :: ACommandTag Client AEConn
@@ -443,7 +449,8 @@ aCommandTag = \case
SENT _ -> SENT_
MERR {} -> MERR_
MSG {} -> MSG_
ACK _ -> ACK_
ACK {} -> ACK_
RCVD {} -> RCVD_
SWCH -> SWCH_
OFF -> OFF_
DEL -> DEL_
@@ -743,6 +750,25 @@ data MsgMeta = MsgMeta
}
deriving (Eq, Show)
instance StrEncoding MsgMeta where
strEncode MsgMeta {integrity, recipient = (rmId, rTs), broker = (bmId, bTs), sndMsgId} =
B.unwords
[ strEncode integrity,
"R=" <> bshow rmId <> "," <> showTs rTs,
"B=" <> encode bmId <> "," <> showTs bTs,
"S=" <> bshow sndMsgId
]
where
showTs = B.pack . formatISO8601Millis
strP = do
integrity <- strP
recipient <- " R=" *> partyMeta A.decimal
broker <- " B=" *> partyMeta base64P
sndMsgId <- " S=" *> A.decimal
pure MsgMeta {integrity, recipient, broker, sndMsgId}
where
partyMeta idParser = (,) <$> idParser <* A.char ',' <*> tsISO8601P
data SMPConfirmation = SMPConfirmation
{ -- | sender's public key to use for authentication of sender's commands at the recepient's server
senderKey :: SndPublicVerifyKey,
@@ -815,7 +841,7 @@ data AgentMessage
= AgentConnInfo ConnInfo
| -- AgentConnInfoReply is only used in duplexHandshake mode (v2), allowing to include reply queue(s) in the initial confirmation.
-- It makes REPLY message unnecessary.
AgentConnInfoReply (L.NonEmpty SMPQueueInfo) ConnInfo
AgentConnInfoReply (NonEmpty SMPQueueInfo) ConnInfo
| AgentRatchetInfo ByteString
| AgentMessage APrivHeader AMessage
deriving (Show)
@@ -841,6 +867,7 @@ data AgentMessageType
| AM_HELLO_
| AM_REPLY_
| AM_A_MSG_
| AM_A_RCVD_
| AM_QCONT_
| AM_QADD_
| AM_QKEY_
@@ -857,6 +884,7 @@ instance Encoding AgentMessageType where
AM_HELLO_ -> "H"
AM_REPLY_ -> "R"
AM_A_MSG_ -> "M"
AM_A_RCVD_ -> "V"
AM_QCONT_ -> "QC"
AM_QADD_ -> "QA"
AM_QKEY_ -> "QK"
@@ -871,6 +899,7 @@ instance Encoding AgentMessageType where
'H' -> pure AM_HELLO_
'R' -> pure AM_REPLY_
'M' -> pure AM_A_MSG_
'V' -> pure AM_A_RCVD_
'Q' ->
A.anyChar >>= \case
'C' -> pure AM_QCONT_
@@ -896,6 +925,7 @@ agentMessageType = \case
-- REPLY is only used in v1
REPLY _ -> AM_REPLY_
A_MSG _ -> AM_A_MSG_
A_RCVD {} -> AM_A_RCVD_
QCONT _ -> AM_QCONT_
QADD _ -> AM_QADD_
QKEY _ -> AM_QKEY_
@@ -920,6 +950,7 @@ data AMsgType
= HELLO_
| REPLY_
| A_MSG_
| A_RCVD_
| QCONT_
| QADD_
| QKEY_
@@ -933,6 +964,7 @@ instance Encoding AMsgType where
HELLO_ -> "H"
REPLY_ -> "R"
A_MSG_ -> "M"
A_RCVD_ -> "V"
QCONT_ -> "QC"
QADD_ -> "QA"
QKEY_ -> "QK"
@@ -944,6 +976,7 @@ instance Encoding AMsgType where
'H' -> pure HELLO_
'R' -> pure REPLY_
'M' -> pure A_MSG_
'V' -> pure A_RCVD_
'Q' ->
A.anyChar >>= \case
'C' -> pure QCONT_
@@ -962,23 +995,60 @@ data AMessage
= -- | the first message in the queue to validate it is secured
HELLO
| -- | reply queues information
REPLY (L.NonEmpty SMPQueueInfo)
REPLY (NonEmpty SMPQueueInfo)
| -- | agent envelope for the client message
A_MSG MsgBody
| -- | agent envelope for delivery receipt
A_RCVD (NonEmpty AMessageReceipt)
| -- | the message instructing the client to continue sending messages (after ERR QUOTA)
QCONT SndQAddr
| -- add queue to connection (sent by recipient), with optional address of the replaced queue
QADD (L.NonEmpty (SMPQueueUri, Maybe SndQAddr))
QADD (NonEmpty (SMPQueueUri, Maybe SndQAddr))
| -- key to secure the added queues and agree e2e encryption key (sent by sender)
QKEY (L.NonEmpty (SMPQueueInfo, SndPublicVerifyKey))
QKEY (NonEmpty (SMPQueueInfo, SndPublicVerifyKey))
| -- inform that the queues are ready to use (sent by recipient)
QUSE (L.NonEmpty (SndQAddr, Bool))
QUSE (NonEmpty (SndQAddr, Bool))
| -- sent by the sender to test new queues and to complete switching
QTEST (L.NonEmpty SndQAddr)
QTEST (NonEmpty SndQAddr)
| -- ratchet re-synchronization is complete, with last decrypted sender message id (recipient's `last_external_snd_msg_id`)
EREADY Int64
EREADY AgentMsgId
deriving (Show)
-- | this type is used to send as part of the protocol between different clients
-- TODO possibly, rename fields and types referring to external and internal IDs to make them different
data AMessageReceipt = AMessageReceipt
{ agentMsgId :: AgentMsgId, -- this is an external snd message ID referenced by the message recipient
msgHash :: MsgHash,
rcptInfo :: MsgReceiptInfo
}
deriving (Show)
-- | this type is used as part of agent protocol to communicate with the user application
data MsgReceipt = MsgReceipt
{ agentMsgId :: AgentMsgId, -- this is an internal agent message ID of received message
msgRcptStatus :: MsgReceiptStatus
}
deriving (Eq, Show)
data MsgReceiptStatus = MROk | MRBadMsgHash
deriving (Eq, Show)
instance StrEncoding MsgReceiptStatus where
strEncode = \case
MROk -> "ok"
MRBadMsgHash -> "badMsgHash"
strP =
A.takeWhile1 (/= ' ') >>= \ case
"ok" -> pure MROk
"badMsgHash" -> pure MRBadMsgHash
_ -> fail "bad MsgReceiptStatus"
instance ToJSON MsgReceiptStatus where
toJSON = strToJSON
toEncoding = strToJEncoding
type MsgReceiptInfo = ByteString
type SndQAddr = (SMPServer, SMP.SenderId)
instance Encoding AMessage where
@@ -986,6 +1056,7 @@ instance Encoding AMessage where
HELLO -> smpEncode HELLO_
REPLY smpQueues -> smpEncode (REPLY_, smpQueues)
A_MSG body -> smpEncode (A_MSG_, Tail body)
A_RCVD mrs -> smpEncode (A_RCVD_, mrs)
QCONT addr -> smpEncode (QCONT_, addr)
QADD qs -> smpEncode (QADD_, qs)
QKEY qs -> smpEncode (QKEY_, qs)
@@ -998,6 +1069,7 @@ instance Encoding AMessage where
HELLO_ -> pure HELLO
REPLY_ -> REPLY <$> smpP
A_MSG_ -> A_MSG . unTail <$> smpP
A_RCVD_ -> A_RCVD <$> smpP
QCONT_ -> QCONT <$> smpP
QADD_ -> QADD <$> smpP
QKEY_ -> QKEY <$> smpP
@@ -1005,6 +1077,21 @@ instance Encoding AMessage where
QTEST_ -> QTEST <$> smpP
EREADY_ -> EREADY <$> smpP
instance Encoding AMessageReceipt where
smpEncode AMessageReceipt {agentMsgId, msgHash, rcptInfo} =
smpEncode (agentMsgId, msgHash, Large rcptInfo)
smpP = do
(agentMsgId, msgHash, Large rcptInfo) <- smpP
pure AMessageReceipt {agentMsgId, msgHash, rcptInfo}
instance StrEncoding MsgReceipt where
strEncode MsgReceipt {agentMsgId, msgRcptStatus} =
strEncode agentMsgId <> ":" <> strEncode msgRcptStatus
strP = do
agentMsgId <- strP <* A.char ':'
msgRcptStatus <- strP
pure MsgReceipt {agentMsgId, msgRcptStatus}
instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where
strEncode = \case
CRInvitationUri crData e2eParams -> crEncode "invitation" crData (Just e2eParams)
@@ -1234,7 +1321,7 @@ deriving instance Show AConnectionRequestUri
data ConnReqUriData = ConnReqUriData
{ crScheme :: ConnReqScheme,
crAgentVRange :: VersionRange,
crSmpQueues :: L.NonEmpty SMPQueueUri,
crSmpQueues :: NonEmpty SMPQueueUri,
crClientData :: Maybe CRClientData
}
deriving (Eq, Show)
@@ -1296,7 +1383,7 @@ instance StrEncoding MsgIntegrity where
strP = "OK" $> MsgOk <|> "ERR " *> (MsgError <$> strP)
strEncode = \case
MsgOk -> "OK"
MsgError e -> "ERR" <> strEncode e
MsgError e -> "ERR " <> strEncode e
instance ToJSON MsgIntegrity where
toJSON = J.genericToJSON $ sumTypeJSON fstToLower
@@ -1316,7 +1403,7 @@ data MsgErrorType
instance StrEncoding MsgErrorType where
strP =
"ID " *> (MsgBadId <$> A.decimal)
<|> "IDS " *> (MsgSkipped <$> A.decimal <* A.space <*> A.decimal)
<|> "NO_ID " *> (MsgSkipped <$> A.decimal <* A.space <*> A.decimal)
<|> "HASH" $> MsgBadHash
<|> "DUPLICATE" $> MsgDuplicate
strEncode = \case
@@ -1557,6 +1644,7 @@ instance StrEncoding ACmdTag where
"MERR" -> ct MERR_
"MSG" -> ct MSG_
"ACK" -> t ACK_
"RCVD" -> ct RCVD_
"SWCH" -> t SWCH_
"OFF" -> t OFF_
"DEL" -> t DEL_
@@ -1611,6 +1699,7 @@ instance (APartyI p, AEntityI e) => StrEncoding (ACommandTag p e) where
MERR_ -> "MERR"
MSG_ -> "MSG"
ACK_ -> "ACK"
RCVD_ -> "RCVD"
SWCH_ -> "SWCH"
OFF_ -> "OFF"
DEL_ -> "DEL"
@@ -1654,7 +1743,7 @@ commandP binaryP =
RJCT_ -> s (RJCT <$> A.takeByteString)
SUB_ -> pure SUB
SEND_ -> s (SEND <$> smpP <* A.space <*> binaryP)
ACK_ -> s (ACK <$> A.decimal)
ACK_ -> s (ACK <$> A.decimal <*> optional (A.space *> binaryP))
SWCH_ -> pure SWCH
OFF_ -> pure OFF
DEL_ -> pure DEL
@@ -1676,7 +1765,8 @@ commandP binaryP =
MID_ -> s (MID <$> A.decimal)
SENT_ -> s (SENT <$> A.decimal)
MERR_ -> s (MERR <$> A.decimal <* A.space <*> strP)
MSG_ -> s (MSG <$> msgMetaP <* A.space <*> smpP <* A.space <*> binaryP)
MSG_ -> s (MSG <$> strP <* A.space <*> smpP <* A.space <*> binaryP)
RCVD_ -> s (RCVD <$> strP <* A.space <*> strP)
DEL_RCVQ_ -> s (DEL_RCVQ <$> strP_ <*> strP_ <*> strP)
DEL_CONN_ -> pure DEL_CONN
DEL_USER_ -> s (DEL_USER <$> strP)
@@ -1701,13 +1791,6 @@ commandP binaryP =
in case ds of
[] -> Left "no sender file description"
sd : rds -> SFDONE <$> strDecode (encodeUtf8 sd) <*> mapM (strDecode . encodeUtf8) rds
msgMetaP = do
integrity <- strP
recipient <- " R=" *> partyMeta A.decimal
broker <- " B=" *> partyMeta base64P
sndMsgId <- " S=" *> A.decimal
pure MsgMeta {integrity, recipient, broker, sndMsgId}
partyMeta idParser = (,) <$> idParser <* A.char ',' <*> tsISO8601P
parseCommand :: ByteString -> Either AgentErrorType ACmd
parseCommand = parse (commandP A.takeByteString) $ CMD SYNTAX
@@ -1736,8 +1819,9 @@ serializeCommand = \case
MID mId -> s (MID_, Str $ bshow mId)
SENT mId -> s (SENT_, Str $ bshow mId)
MERR mId e -> s (MERR_, Str $ bshow mId, e)
MSG msgMeta msgFlags msgBody -> B.unwords [s MSG_, serializeMsgMeta msgMeta, smpEncode msgFlags, serializeBinary msgBody]
ACK mId -> s (ACK_, Str $ bshow mId)
MSG msgMeta msgFlags msgBody -> B.unwords [s MSG_, s msgMeta, smpEncode msgFlags, serializeBinary msgBody]
ACK mId rcptInfo_ -> s (ACK_, Str $ bshow mId) <> maybe "" (B.cons ' ' . serializeBinary) rcptInfo_
RCVD msgMeta rcpts -> s (RCVD_, msgMeta, rcpts)
SWCH -> s SWCH_
OFF -> s OFF_
DEL -> s DEL_
@@ -1759,19 +1843,9 @@ serializeCommand = \case
where
s :: StrEncoding a => a -> ByteString
s = strEncode
showTs :: UTCTime -> ByteString
showTs = B.pack . formatISO8601Millis
connections :: [ConnId] -> ByteString
connections = B.intercalate "," . map strEncode
sfDone sd rds = B.intercalate fdSeparator $ strEncode sd : map strEncode rds
serializeMsgMeta :: MsgMeta -> ByteString
serializeMsgMeta MsgMeta {integrity, recipient = (rmId, rTs), broker = (bmId, bTs), sndMsgId} =
B.unwords
[ strEncode integrity,
"R=" <> bshow rmId <> "," <> showTs rTs,
"B=" <> encode bmId <> "," <> showTs bTs,
"S=" <> bshow sndMsgId
]
serializeBinary :: ByteString -> ByteString
serializeBinary body = bshow (B.length body) <> "\n" <> body
+14
View File
@@ -315,6 +315,9 @@ ratchetSyncAllowed cData@ConnData {ratchetSyncState} =
ratchetSyncSupported' :: ConnData -> Bool
ratchetSyncSupported' ConnData {connAgentVersion} = connAgentVersion >= 3
messageRcptsSupported :: ConnData -> Bool
messageRcptsSupported ConnData {connAgentVersion} = connAgentVersion >= 4
-- this function should be mirrored in the clients
ratchetSyncSendProhibited :: ConnData -> Bool
ratchetSyncSendProhibited ConnData {ratchetSyncState} =
@@ -506,7 +509,10 @@ data RcvMsgData = RcvMsgData
data RcvMsg = RcvMsg
{ internalId :: InternalId,
msgMeta :: MsgMeta,
msgType :: AgentMessageType,
msgBody :: MsgBody,
internalHash :: MsgHash,
msgReceipt :: Maybe MsgReceipt, -- if this message is a delivery receipt
userAck :: Bool
}
@@ -521,6 +527,14 @@ data SndMsgData = SndMsgData
prevMsgHash :: MsgHash
}
data SndMsg = SndMsg
{ internalId :: InternalId,
internalSndId :: InternalSndId,
msgType :: AgentMessageType,
internalHash :: MsgHash,
msgReceipt :: Maybe MsgReceipt
}
data PendingMsgData = PendingMsgData
{ msgId :: InternalId,
msgType :: AgentMessageType,
+108 -20
View File
@@ -56,8 +56,8 @@ module Simplex.Messaging.Agent.Store.SQLite
getDeletedConnIds,
setConnRatchetSync,
addProcessedRatchetKeyHash,
checkProcessedRatchetKeyHashExists,
deleteProcessedRatchetKeyHashesExpired,
checkRatchetKeyHashExists,
deleteRatchetKeyHashesExpired,
getRcvConn,
getRcvQueueById,
getSndQueueById,
@@ -99,16 +99,21 @@ module Simplex.Messaging.Agent.Store.SQLite
updateSndIds,
createSndMsg,
createSndMsgDelivery,
getSndMsgViaRcpt,
updateSndMsgRcpt,
getPendingMsgData,
updatePendingMsgRIState,
getPendingMsgs,
deletePendingMsgs,
setMsgUserAck,
getRcvMsg,
getLastMsg,
checkRcvMsgHashExists,
deleteMsg,
deleteDeliveredSndMsg,
deleteSndMsgDelivery,
deleteRcvMsgHashesExpired,
deleteSndMsgsExpired,
-- Double ratchet persistence
createRatchetX3dhKeys,
getRatchetX3dhKeys,
@@ -893,6 +898,31 @@ createSndMsgDelivery :: DB.Connection -> ConnId -> SndQueue -> InternalId -> IO
createSndMsgDelivery db connId SndQueue {dbQueueId} msgId =
DB.execute db "INSERT INTO snd_message_deliveries (conn_id, snd_queue_id, internal_id) VALUES (?, ?, ?)" (connId, dbQueueId, msgId)
getSndMsgViaRcpt :: DB.Connection -> ConnId -> InternalSndId -> IO (Either StoreError SndMsg)
getSndMsgViaRcpt db connId sndMsgId =
firstRow toSndMsg SEMsgNotFound $
DB.query
db
[sql|
SELECT s.internal_id, m.msg_type, s.internal_hash, s.rcpt_internal_id, s.rcpt_status
FROM snd_messages s
JOIN messages m ON s.internal_id = m.internal_id
WHERE s.conn_id = ? AND s.internal_snd_id = ?
|]
(connId, sndMsgId)
where
toSndMsg :: (InternalId, AgentMessageType, MsgHash, Maybe AgentMsgId, Maybe MsgReceiptStatus) -> SndMsg
toSndMsg (internalId, msgType, internalHash, rcptInternalId_, rcptStatus_) =
let msgReceipt = MsgReceipt <$> rcptInternalId_ <*> rcptStatus_
in SndMsg {internalId, internalSndId = sndMsgId, msgType, internalHash, msgReceipt}
updateSndMsgRcpt :: DB.Connection -> ConnId -> InternalSndId -> MsgReceipt -> IO ()
updateSndMsgRcpt db connId sndMsgId MsgReceipt {agentMsgId, msgRcptStatus} =
DB.execute
db
"UPDATE snd_messages SET rcpt_internal_id = ?, rcpt_status = ? WHERE conn_id = ? AND internal_snd_id = ?"
(agentMsgId, msgRcptStatus, connId, sndMsgId)
getPendingMsgData :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (Maybe RcvQueue, PendingMsgData))
getPendingMsgData db connId msgId = do
rq_ <- L.head <$$> getRcvQueuesByConnId_ db connId
@@ -929,32 +959,51 @@ deletePendingMsgs db connId SndQueue {dbQueueId} =
setMsgUserAck :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError (RcvQueue, SMP.MsgId))
setMsgUserAck db connId agentMsgId = runExceptT $ do
liftIO $ DB.execute db "UPDATE rcv_messages SET user_ack = ? WHERE conn_id = ? AND internal_id = ?" (True, connId, agentMsgId)
(dbRcvId, srvMsgId) <-
ExceptT . firstRow id SEMsgNotFound $
DB.query db "SELECT rcv_queue_id, broker_id FROM rcv_messages WHERE conn_id = ? AND internal_id = ?" (connId, agentMsgId)
rq <- ExceptT $ getRcvQueueById db connId dbRcvId
liftIO $ DB.execute db "UPDATE rcv_messages SET user_ack = ? WHERE conn_id = ? AND internal_id = ?" (True, connId, agentMsgId)
pure (rq, srvMsgId)
getLastMsg :: DB.Connection -> ConnId -> SMP.MsgId -> IO (Maybe RcvMsg)
getLastMsg db connId msgId =
maybeFirstRow rcvMsg $
getRcvMsg :: DB.Connection -> ConnId -> InternalId -> IO (Either StoreError RcvMsg)
getRcvMsg db connId agentMsgId =
firstRow toRcvMsg SEMsgNotFound $
DB.query
db
[sql|
SELECT
r.internal_id, m.internal_ts, r.broker_id, r.broker_ts, r.external_snd_id, r.integrity,
m.msg_body, r.user_ack
r.internal_id, m.internal_ts, r.broker_id, r.broker_ts, r.external_snd_id, r.integrity, r.internal_hash,
m.msg_type, m.msg_body, s.internal_id, s.rcpt_status, r.user_ack
FROM rcv_messages r
JOIN messages m ON r.internal_id = m.internal_id
LEFT JOIN snd_messages s ON s.rcpt_internal_id = r.internal_id
WHERE r.conn_id = ? AND r.internal_id = ?
|]
(connId, agentMsgId)
getLastMsg :: DB.Connection -> ConnId -> SMP.MsgId -> IO (Maybe RcvMsg)
getLastMsg db connId msgId =
maybeFirstRow toRcvMsg $
DB.query
db
[sql|
SELECT
r.internal_id, m.internal_ts, r.broker_id, r.broker_ts, r.external_snd_id, r.integrity, r.internal_hash,
m.msg_type, m.msg_body, s.internal_id, s.rcpt_status, r.user_ack
FROM rcv_messages r
JOIN messages m ON r.internal_id = m.internal_id
JOIN connections c ON r.conn_id = c.conn_id AND c.last_internal_msg_id = r.internal_id
LEFT JOIN snd_messages s ON s.rcpt_internal_id = r.internal_id
WHERE r.conn_id = ? AND r.broker_id = ?
|]
(connId, msgId)
where
rcvMsg (agentMsgId, internalTs, brokerId, brokerTs, sndMsgId, integrity, msgBody, userAck) =
let msgMeta = MsgMeta {recipient = (agentMsgId, internalTs), broker = (brokerId, brokerTs), sndMsgId, integrity}
in RcvMsg {internalId = InternalId agentMsgId, msgMeta, msgBody, userAck}
toRcvMsg :: (Int64, InternalTs, BrokerId, BrokerTs, AgentMsgId, MsgIntegrity, MsgHash, AgentMessageType, MsgBody, Maybe AgentMsgId, Maybe MsgReceiptStatus, Bool) -> RcvMsg
toRcvMsg (agentMsgId, internalTs, brokerId, brokerTs, sndMsgId, integrity, internalHash, msgType, msgBody, rcptInternalId_, rcptStatus_, userAck) =
let msgMeta = MsgMeta {recipient = (agentMsgId, internalTs), broker = (brokerId, brokerTs), sndMsgId, integrity}
msgReceipt = MsgReceipt <$> rcptInternalId_ <*> rcptStatus_
in RcvMsg {internalId = InternalId agentMsgId, msgMeta, msgType, msgBody, internalHash, msgReceipt, userAck}
checkRcvMsgHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool
checkRcvMsgHashExists db connId hash = do
@@ -971,20 +1020,55 @@ deleteMsg :: DB.Connection -> ConnId -> InternalId -> IO ()
deleteMsg db connId msgId =
DB.execute db "DELETE FROM messages WHERE conn_id = ? AND internal_id = ?;" (connId, msgId)
deleteSndMsgDelivery :: DB.Connection -> ConnId -> SndQueue -> InternalId -> IO ()
deleteSndMsgDelivery db connId SndQueue {dbQueueId} msgId = do
deleteMsgContent :: DB.Connection -> ConnId -> InternalId -> IO ()
deleteMsgContent db connId msgId =
DB.execute db "UPDATE messages SET msg_body = x'' WHERE conn_id = ? AND internal_id = ?;" (connId, msgId)
deleteDeliveredSndMsg :: DB.Connection -> ConnId -> InternalId -> IO ()
deleteDeliveredSndMsg db connId msgId = do
cnt <- countPendingSndDeliveries_ db connId msgId
when (cnt == 0) $ deleteMsg db connId msgId
deleteSndMsgDelivery :: DB.Connection -> ConnId -> SndQueue -> InternalId -> Bool -> IO ()
deleteSndMsgDelivery db connId SndQueue {dbQueueId} msgId keepForReceipt = do
DB.execute
db
"DELETE FROM snd_message_deliveries WHERE conn_id = ? AND snd_queue_id = ? AND internal_id = ?"
(connId, dbQueueId, msgId)
(Only (cnt :: Int) : _) <- DB.query db "SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ?" (connId, msgId)
when (cnt == 0) $ deleteMsg db connId msgId
cnt <- countPendingSndDeliveries_ db connId msgId
when (cnt == 0) $ do
del <-
maybeFirstRow id (DB.query db "SELECT rcpt_internal_id, rcpt_status FROM snd_messages WHERE conn_id = ? AND internal_id = ?" (connId, msgId)) >>= \case
Just (Just (_ :: Int64), Just MROk) -> pure deleteMsg
_ -> pure $ if keepForReceipt then deleteMsgContent else deleteMsg
del db connId msgId
countPendingSndDeliveries_ :: DB.Connection -> ConnId -> InternalId -> IO Int
countPendingSndDeliveries_ db connId msgId = do
(Only cnt : _) <- DB.query db "SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ?" (connId, msgId)
pure cnt
deleteRcvMsgHashesExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteRcvMsgHashesExpired db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
DB.execute db "DELETE FROM encrypted_rcv_message_hashes WHERE created_at < ?" (Only cutoffTs)
deleteSndMsgsExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteSndMsgsExpired db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
DB.execute
db
[sql|
DELETE FROM messages
WHERE internal_id IN (
SELECT s.internal_id
FROM snd_messages s
JOIN messages m USING (internal_id)
WHERE m.internal_ts < ?
)
|]
(Only cutoffTs)
createRatchetX3dhKeys :: DB.Connection -> ConnId -> C.PrivateKeyX448 -> C.PrivateKeyX448 -> IO ()
createRatchetX3dhKeys db connId x3dhPrivKey1 x3dhPrivKey2 =
DB.execute db "INSERT INTO ratchets (conn_id, x3dh_priv_key_1, x3dh_priv_key_2) VALUES (?, ?, ?)" (connId, x3dhPrivKey1, x3dhPrivKey2)
@@ -1504,6 +1588,10 @@ instance ToField AgentCommandTag where toField = toField . strEncode
instance FromField AgentCommandTag where fromField = blobFieldParser strP
instance ToField MsgReceiptStatus where toField = toField . decodeLatin1 . strEncode
instance FromField MsgReceiptStatus where fromField = fromTextField_ $ eitherToMaybe . strDecode . encodeUtf8
listToEither :: e -> [a] -> Either e a
listToEither _ (x : _) = Right x
listToEither e _ = Left e
@@ -1682,8 +1770,8 @@ addProcessedRatchetKeyHash :: DB.Connection -> ConnId -> ByteString -> IO ()
addProcessedRatchetKeyHash db connId hash =
DB.execute db "INSERT INTO processed_ratchet_key_hashes (conn_id, hash) VALUES (?,?)" (connId, hash)
checkProcessedRatchetKeyHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool
checkProcessedRatchetKeyHashExists db connId hash = do
checkRatchetKeyHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool
checkRatchetKeyHashExists db connId hash = do
fromMaybe False
<$> maybeFirstRow
fromOnly
@@ -1693,8 +1781,8 @@ checkProcessedRatchetKeyHashExists db connId hash = do
(connId, hash)
)
deleteProcessedRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteProcessedRatchetKeyHashesExpired db ttl = do
deleteRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteRatchetKeyHashesExpired db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
DB.execute db "DELETE FROM processed_ratchet_key_hashes WHERE created_at < ?" (Only cutoffTs)
@@ -63,6 +63,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230510_files_pending_r
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230516_encrypted_rcv_message_hashes
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230531_switch_status
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230615_ratchet_sync
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230701_delivery_receipts
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON)
import Simplex.Messaging.Transport.Client (TransportHost)
@@ -91,7 +92,8 @@ schemaMigrations =
("m20230510_files_pending_replicas_indexes", m20230510_files_pending_replicas_indexes, Just down_m20230510_files_pending_replicas_indexes),
("m20230516_encrypted_rcv_message_hashes", m20230516_encrypted_rcv_message_hashes, Just down_m20230516_encrypted_rcv_message_hashes),
("m20230531_switch_status", m20230531_switch_status, Just down_m20230531_switch_status),
("m20230615_ratchet_sync", m20230615_ratchet_sync, Just down_m20230615_ratchet_sync)
("m20230615_ratchet_sync", m20230615_ratchet_sync, Just down_m20230615_ratchet_sync),
("m20230701_delivery_receipts", m20230701_delivery_receipts, Just down_m20230701_delivery_receipts)
]
-- | The list of migrations in ascending order by date
@@ -0,0 +1,24 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230701_delivery_receipts where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
m20230701_delivery_receipts :: Query
m20230701_delivery_receipts =
[sql|
ALTER TABLE snd_messages ADD COLUMN rcpt_internal_id INTEGER;
ALTER TABLE snd_messages ADD COLUMN rcpt_status TEXT;
CREATE INDEX idx_snd_messages_rcpt_internal_id ON snd_messages(conn_id, rcpt_internal_id);
|]
down_m20230701_delivery_receipts :: Query
down_m20230701_delivery_receipts =
[sql|
DROP INDEX idx_snd_messages_rcpt_internal_id;
ALTER TABLE snd_messages DROP COLUMN rcpt_internal_id;
ALTER TABLE snd_messages DROP COLUMN rcpt_status;
|]
@@ -119,6 +119,8 @@ CREATE TABLE snd_messages(
previous_msg_hash BLOB NOT NULL DEFAULT x'',
retry_int_slow INTEGER,
retry_int_fast INTEGER,
rcpt_internal_id INTEGER,
rcpt_status TEXT,
PRIMARY KEY(conn_id, internal_snd_id),
FOREIGN KEY(conn_id, internal_id) REFERENCES messages
ON DELETE CASCADE
@@ -461,3 +463,7 @@ CREATE INDEX idx_processed_ratchet_key_hashes_hash ON processed_ratchet_key_hash
conn_id,
hash
);
CREATE INDEX idx_snd_messages_rcpt_internal_id ON snd_messages(
conn_id,
rcpt_internal_id
);
+2 -2
View File
@@ -114,7 +114,7 @@ connectionRequestTests =
<> urlEncode True testDhKeyStrUri
<> "&e2e=v%3D1%26x3dh%3DMEIwBQYDK2VvAzkAmKuSYeQ_m0SixPDS8Wq8VBaTS1cW-Lp0n0h4Diu-kUpR-qXx4SDJ32YGEFoGFGSbGPry5Ychr6U%3D%2CMEIwBQYDK2VvAzkAmKuSYeQ_m0SixPDS8Wq8VBaTS1cW-Lp0n0h4Diu-kUpR-qXx4SDJ32YGEFoGFGSbGPry5Ychr6U%3D"
strEncode connectionRequestCurrentRange
`shouldBe` "https://simplex.chat/invitation#/?v=1-3&smp=smp%3A%2F%2F1234-w%3D%3D%40smp.simplex.im%3A5223%2F3456-w%3D%3D%23%2F%3Fv%3D1%26dh%3D"
`shouldBe` "https://simplex.chat/invitation#/?v=1-4&smp=smp%3A%2F%2F1234-w%3D%3D%40smp.simplex.im%3A5223%2F3456-w%3D%3D%23%2F%3Fv%3D1%26dh%3D"
<> urlEncode True testDhKeyStrUri
<> "%2Csmp%3A%2F%2F1234-w%3D%3D%40smp.simplex.im%3A5223%2F3456-w%3D%3D%23%2F%3Fv%3D1%26dh%3D"
<> urlEncode True testDhKeyStrUri
@@ -158,7 +158,7 @@ connectionRequestTests =
<> testDhKeyStrUri
<> "&e2e=extra_key%3Dnew%26v%3D1-2%26x3dh%3DMEIwBQYDK2VvAzkAmKuSYeQ_m0SixPDS8Wq8VBaTS1cW-Lp0n0h4Diu-kUpR-qXx4SDJ32YGEFoGFGSbGPry5Ychr6U%3D%2CMEIwBQYDK2VvAzkAmKuSYeQ_m0SixPDS8Wq8VBaTS1cW-Lp0n0h4Diu-kUpR-qXx4SDJ32YGEFoGFGSbGPry5Ychr6U%3D"
<> "&some_new_param=abc"
<> "&v=1-3"
<> "&v=1-4"
)
`shouldBe` Right connectionRequestCurrentRange
strDecode
+110 -37
View File
@@ -104,6 +104,9 @@ pGet c = do
pattern Msg :: MsgBody -> ACommand 'Agent e
pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} _ msgBody
pattern Rcvd :: AgentMsgId -> ACommand 'Agent e
pattern Rcvd agentMsgId <- RCVD MsgMeta {integrity = MsgOk} [MsgReceipt {agentMsgId, msgRcptStatus = MROk}]
smpCfgVPrev :: ProtocolClientConfig
smpCfgVPrev = (smpCfg agentCfg) {serverVRange = serverVRangePrev}
where
@@ -293,6 +296,9 @@ functionalAPITests t = do
describe "getRatchetAdHash" $
it "should return the same data for both peers" $
withSmpServer t testRatchetAdHash
describe "Delivery receipts" $ do
it "should send and receive delivery receipt" $ withSmpServer t testDeliveryReceipts
it "should send delivery receipt only in connection v3+" $ testDeliveryReceiptsVersion t
testBasicAuth :: ATransport -> Bool -> (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> IO Int
testBasicAuth t allowNewQueues srv@(srvAuth, srvVersion) clnt1 clnt2 = do
@@ -352,17 +358,17 @@ runAgentClientTest alice bob baseId = do
2 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
get alice ##> ("", bobId, SENT $ baseId + 2)
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
ackMessage bob aliceId $ baseId + 1
ackMessage bob aliceId (baseId + 1) Nothing
get bob =##> \case ("", c, Msg "how are you?") -> c == aliceId; _ -> False
ackMessage bob aliceId $ baseId + 2
ackMessage bob aliceId (baseId + 2) Nothing
3 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "hello too"
get bob ##> ("", aliceId, SENT $ baseId + 3)
4 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 1"
get bob ##> ("", aliceId, SENT $ baseId + 4)
get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 3
ackMessage alice bobId (baseId + 3) Nothing
get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 4
ackMessage alice bobId (baseId + 4) Nothing
suspendConnection alice bobId
5 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 2"
get bob ##> ("", aliceId, MERR (baseId + 5) (SMP AUTH))
@@ -389,17 +395,17 @@ runAgentClientContactTest alice bob baseId = do
2 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
get alice ##> ("", bobId, SENT $ baseId + 2)
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
ackMessage bob aliceId $ baseId + 1
ackMessage bob aliceId (baseId + 1) Nothing
get bob =##> \case ("", c, Msg "how are you?") -> c == aliceId; _ -> False
ackMessage bob aliceId $ baseId + 2
ackMessage bob aliceId (baseId + 2) Nothing
3 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "hello too"
get bob ##> ("", aliceId, SENT $ baseId + 3)
4 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 1"
get bob ##> ("", aliceId, SENT $ baseId + 4)
get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 3
ackMessage alice bobId (baseId + 3) Nothing
get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 4
ackMessage alice bobId (baseId + 4) Nothing
suspendConnection alice bobId
5 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 2"
get bob ##> ("", aliceId, MERR (baseId + 5) (SMP AUTH))
@@ -702,7 +708,7 @@ testDuplicateMessage t = do
runRight_ $ do
subscribeConnection bob1 aliceId
get bob1 =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
ackMessage bob1 aliceId 4
ackMessage bob1 aliceId 4 Nothing
5 <- sendMessage alice bobId SMP.noMsgFlags "hello 2"
get alice ##> ("", bobId, SENT 5)
get bob1 =##> \case ("", c, Msg "hello 2") -> c == aliceId; _ -> False
@@ -714,7 +720,7 @@ testDuplicateMessage t = do
-- commenting two lines below and uncommenting further two lines would also runRight_,
-- it is the scenario tested above, when the message was not acknowledged by the user
threadDelay 200000
Left (BROKER _ TIMEOUT) <- runExceptT $ ackMessage bob1 aliceId 5
Left (BROKER _ TIMEOUT) <- runExceptT $ ackMessage bob1 aliceId 5 Nothing
disconnectAgentClient alice
disconnectAgentClient bob1
@@ -727,7 +733,7 @@ testDuplicateMessage t = do
subscribeConnection bob2 aliceId
subscribeConnection alice2 bobId
-- get bob2 =##> \case ("", c, Msg "hello 2") -> c == aliceId; _ -> False
-- ackMessage bob2 aliceId 5
-- ackMessage bob2 aliceId 5 Nothing
-- message 2 is not delivered again, even though it was delivered to the agent
6 <- sendMessage alice2 bobId SMP.noMsgFlags "hello 3"
get alice2 ##> ("", bobId, SENT 6)
@@ -743,7 +749,7 @@ testSkippedMessages t = do
4 <- sendMessage alice bobId SMP.noMsgFlags "hello"
get alice ##> ("", bobId, SENT 4)
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
ackMessage bob aliceId 4
ackMessage bob aliceId 4 Nothing
disconnectAgentClient bob
@@ -773,12 +779,12 @@ testSkippedMessages t = do
8 <- sendMessage alice2 bobId SMP.noMsgFlags "hello 5"
get alice2 ##> ("", bobId, SENT 8)
get bob2 =##> \case ("", c, MSG MsgMeta {integrity = MsgError {errorInfo = MsgSkipped {fromMsgId = 4, toMsgId = 6}}} _ "hello 5") -> c == aliceId; _ -> False
ackMessage bob2 aliceId 5
ackMessage bob2 aliceId 5 Nothing
9 <- sendMessage alice2 bobId SMP.noMsgFlags "hello 6"
get alice2 ##> ("", bobId, SENT 9)
get bob2 =##> \case ("", c, Msg "hello 6") -> c == aliceId; _ -> False
ackMessage bob2 aliceId 6
ackMessage bob2 aliceId 6 Nothing
testRatchetSync :: HasCallStack => ATransport -> IO ()
testRatchetSync t = do
@@ -807,24 +813,24 @@ setupDesynchronizedRatchet alice bob = do
4 <- sendMessage alice bobId SMP.noMsgFlags "hello"
get alice ##> ("", bobId, SENT 4)
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
ackMessage bob aliceId 4
ackMessage bob aliceId 4 Nothing
5 <- sendMessage bob aliceId SMP.noMsgFlags "hello 2"
get bob ##> ("", aliceId, SENT 5)
get alice =##> \case ("", c, Msg "hello 2") -> c == bobId; _ -> False
ackMessage alice bobId 5
ackMessage alice bobId 5 Nothing
liftIO $ copyFile testDB2 (testDB2 <> ".bak")
6 <- sendMessage alice bobId SMP.noMsgFlags "hello 3"
get alice ##> ("", bobId, SENT 6)
get bob =##> \case ("", c, Msg "hello 3") -> c == aliceId; _ -> False
ackMessage bob aliceId 6
ackMessage bob aliceId 6 Nothing
7 <- sendMessage bob aliceId SMP.noMsgFlags "hello 4"
get bob ##> ("", aliceId, SENT 7)
get alice =##> \case ("", c, Msg "hello 4") -> c == bobId; _ -> False
ackMessage alice bobId 7
ackMessage alice bobId 7 Nothing
disconnectAgentClient bob
@@ -1056,7 +1062,7 @@ testSuspendingAgent = do
4 <- sendMessage a bId SMP.noMsgFlags "hello"
get a ##> ("", bId, SENT 4)
get b =##> \case ("", c, Msg "hello") -> c == aId; _ -> False
ackMessage b aId 4
ackMessage b aId 4 Nothing
suspendAgent b 1000000
get' b ##> ("", "", SUSPENDED)
5 <- sendMessage a bId SMP.noMsgFlags "hello 2"
@@ -1074,7 +1080,7 @@ testSuspendingAgentCompleteSending t = do
4 <- sendMessage a bId SMP.noMsgFlags "hello"
get a ##> ("", bId, SENT 4)
get b =##> \case ("", c, Msg "hello") -> c == aId; _ -> False
ackMessage b aId 4
ackMessage b aId 4 Nothing
pure (aId, bId)
runRight_ $ do
@@ -1093,9 +1099,9 @@ testSuspendingAgentCompleteSending t = do
pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False
pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False
ackMessage a bId 5
ackMessage a bId 5 Nothing
get a =##> \case ("", c, Msg "how are you?") -> c == bId; _ -> False
ackMessage a bId 6
ackMessage a bId 6 Nothing
testSuspendingAgentTimeout :: ATransport -> IO ()
testSuspendingAgentTimeout t = do
@@ -1106,7 +1112,7 @@ testSuspendingAgentTimeout t = do
4 <- sendMessage a bId SMP.noMsgFlags "hello"
get a ##> ("", bId, SENT 4)
get b =##> \case ("", c, Msg "hello") -> c == aId; _ -> False
ackMessage b aId 4
ackMessage b aId 4 Nothing
pure (aId, bId)
runRight_ $ do
@@ -1205,20 +1211,20 @@ testAsyncCommands = do
2 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
get alice ##> ("", bobId, SENT $ baseId + 2)
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
ackMessageAsync bob "4" aliceId $ baseId + 1
ackMessageAsync bob "4" aliceId (baseId + 1) Nothing
("4", _, OK) <- get bob
get bob =##> \case ("", c, Msg "how are you?") -> c == aliceId; _ -> False
ackMessageAsync bob "5" aliceId $ baseId + 2
ackMessageAsync bob "5" aliceId (baseId + 2) Nothing
("5", _, OK) <- get bob
3 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "hello too"
get bob ##> ("", aliceId, SENT $ baseId + 3)
4 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 1"
get bob ##> ("", aliceId, SENT $ baseId + 4)
get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False
ackMessageAsync alice "6" bobId $ baseId + 3
ackMessageAsync alice "6" bobId (baseId + 3) Nothing
("6", _, OK) <- get alice
get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False
ackMessageAsync alice "7" bobId $ baseId + 4
ackMessageAsync alice "7" bobId (baseId + 4) Nothing
("7", _, OK) <- get alice
deleteConnectionAsync alice bobId
get alice =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bobId; _ -> False
@@ -1263,17 +1269,17 @@ testAcceptContactAsync = do
2 <- msgId <$> sendMessage alice bobId SMP.noMsgFlags "how are you?"
get alice ##> ("", bobId, SENT $ baseId + 2)
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
ackMessage bob aliceId $ baseId + 1
ackMessage bob aliceId (baseId + 1) Nothing
get bob =##> \case ("", c, Msg "how are you?") -> c == aliceId; _ -> False
ackMessage bob aliceId $ baseId + 2
ackMessage bob aliceId (baseId + 2) Nothing
3 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "hello too"
get bob ##> ("", aliceId, SENT $ baseId + 3)
4 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 1"
get bob ##> ("", aliceId, SENT $ baseId + 4)
get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 3
ackMessage alice bobId (baseId + 3) Nothing
get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 4
ackMessage alice bobId (baseId + 4) Nothing
suspendConnection alice bobId
5 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 2"
get bob ##> ("", aliceId, MERR (baseId + 5) (SMP AUTH))
@@ -1740,7 +1746,7 @@ testSwitch2ConnectionsAbort1 servers = do
withB :: (AgentClient -> IO a) -> IO a
withB = withAgent agentCfg {initialClientId = 1} servers testDB2
testCreateQueueAuth :: (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> IO Int
testCreateQueueAuth :: HasCallStack => (Maybe BasicAuth, Version) -> (Maybe BasicAuth, Version) -> IO Int
testCreateQueueAuth clnt1 clnt2 = do
a <- getClient clnt1
b <- getClient clnt2
@@ -1772,7 +1778,7 @@ testSMPServerConnectionTest t newQueueBasicAuth srv =
a <- getSMPAgentClient' agentCfg initAgentServers testDB -- initially passed server is not running
runRight $ testProtocolServer a 1 srv
testRatchetAdHash :: IO ()
testRatchetAdHash :: HasCallStack => IO ()
testRatchetAdHash = do
a <- getSMPAgentClient' agentCfg initAgentServers testDB
b <- getSMPAgentClient' agentCfg initAgentServers testDB2
@@ -1782,6 +1788,73 @@ testRatchetAdHash = do
ad2 <- getConnectionRatchetAdHash b aId
liftIO $ ad1 `shouldBe` ad2
testDeliveryReceipts :: HasCallStack => IO ()
testDeliveryReceipts = do
a <- getSMPAgentClient' agentCfg initAgentServers testDB
b <- getSMPAgentClient' agentCfg initAgentServers testDB2
runRight_ $ do
(aId, bId) <- makeConnection a b
-- a sends, b receives and sends delivery receipt
4 <- sendMessage a bId SMP.noMsgFlags "hello"
get a ##> ("", bId, SENT 4)
get b =##> \case ("", c, Msg "hello") -> c == aId; _ -> False
ackMessage b aId 4 $ Just ""
get a =##> \case ("", c, Rcvd 4) -> c == bId; _ -> False
ackMessage a bId 5 Nothing
-- b sends, a receives and sends delivery receipt
6 <- sendMessage b aId SMP.noMsgFlags "hello too"
get b ##> ("", aId, SENT 6)
get a =##> \case ("", c, Msg "hello too") -> c == bId; _ -> False
ackMessage a bId 6 $ Just ""
get b =##> \case ("", c, Rcvd 6) -> c == aId; _ -> False
ackMessage b aId 7 (Just "") `catchError` \e -> liftIO $ e `shouldBe` Agent.CMD PROHIBITED
ackMessage b aId 7 Nothing
testDeliveryReceiptsVersion :: HasCallStack => ATransport -> IO ()
testDeliveryReceiptsVersion t = do
a <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB
b <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 3} initAgentServers testDB2
withSmpServerStoreMsgLogOn t testPort $ \_ -> do
(aId, bId) <- runRight $ do
(aId, bId) <- makeConnection a b
checkVersion a bId 3
checkVersion b aId 3
4 <- sendMessage a bId SMP.noMsgFlags "hello"
get a ##> ("", bId, SENT 4)
get b =##> \case ("", c, Msg "hello") -> c == aId; _ -> False
ackMessage b aId 4 $ Just ""
liftIO $ noMessages a "no delivery receipt (unsupported version)"
5 <- sendMessage b aId SMP.noMsgFlags "hello too"
get b ##> ("", aId, SENT 5)
get a =##> \case ("", c, Msg "hello too") -> c == bId; _ -> False
ackMessage a bId 5 $ Just ""
liftIO $ noMessages b "no delivery receipt (unsupported version)"
pure (aId, bId)
disconnectAgentClient a
disconnectAgentClient b
a' <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 4} initAgentServers testDB
b' <- getSMPAgentClient' agentCfg {smpAgentVRange = mkVersionRange 1 4} initAgentServers testDB2
runRight_ $ do
subscribeConnection a' bId
subscribeConnection b' aId
exchangeGreetingsMsgId 6 a' bId b' aId
checkVersion a' bId 4
checkVersion b' aId 4
8 <- sendMessage a' bId SMP.noMsgFlags "hello"
get a' ##> ("", bId, SENT 8)
get b' =##> \case ("", c, Msg "hello") -> c == aId; _ -> False
ackMessage b' aId 8 $ Just ""
get a' =##> \case ("", c, Rcvd 8) -> c == bId; _ -> False
ackMessage a' bId 9 Nothing
10 <- sendMessage b' aId SMP.noMsgFlags "hello too"
get b' ##> ("", aId, SENT 10)
get a' =##> \case ("", c, Msg "hello too") -> c == bId; _ -> False
ackMessage a' bId 10 $ Just ""
get b' =##> \case ("", c, Rcvd 10) -> c == aId; _ -> False
ackMessage b' aId 11 Nothing
testTwoUsers :: HasCallStack => IO ()
testTwoUsers = do
let nc = netCfg initAgentServers
@@ -1898,13 +1971,13 @@ exchangeGreetingsMsgId msgId alice bobId bob aliceId = do
liftIO $ msgId1 `shouldBe` msgId
get alice ##> ("", bobId, SENT msgId)
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
ackMessage bob aliceId msgId
ackMessage bob aliceId msgId Nothing
msgId2 <- sendMessage bob aliceId SMP.noMsgFlags "hello too"
let msgId' = msgId + 1
liftIO $ msgId2 `shouldBe` msgId'
get bob ##> ("", aliceId, SENT msgId')
get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False
ackMessage alice bobId msgId'
ackMessage alice bobId msgId' Nothing
exchangeGreetingsMsgIds :: HasCallStack => AgentClient -> ConnId -> Int64 -> AgentClient -> ConnId -> Int64 -> ExceptT AgentErrorType IO ()
exchangeGreetingsMsgIds alice bobId aliceMsgId bob aliceId bobMsgId = do
@@ -1912,11 +1985,11 @@ exchangeGreetingsMsgIds alice bobId aliceMsgId bob aliceId bobMsgId = do
liftIO $ msgId1 `shouldBe` aliceMsgId
get alice ##> ("", bobId, SENT aliceMsgId)
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
ackMessage bob aliceId bobMsgId
ackMessage bob aliceId bobMsgId Nothing
msgId2 <- sendMessage bob aliceId SMP.noMsgFlags "hello too"
let aliceMsgId' = aliceMsgId + 1
bobMsgId' = bobMsgId + 1
liftIO $ msgId2 `shouldBe` bobMsgId'
get bob ##> ("", aliceId, SENT bobMsgId')
get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False
ackMessage alice bobId aliceMsgId'
ackMessage alice bobId aliceMsgId' Nothing
+14 -14
View File
@@ -253,7 +253,7 @@ testNotificationSubscriptionExistingConnection APNSMockServer {apnsQ} = do
runRight_ $ do
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 1
ackMessage alice bobId (baseId + 1) Nothing
-- delete notification subscription
toggleConnectionNtfs alice bobId False
liftIO $ threadDelay 250000
@@ -296,13 +296,13 @@ testNotificationSubscriptionNewConnection APNSMockServer {apnsQ} = do
get bob ##> ("", aliceId, SENT $ baseId + 1)
void $ messageNotification apnsQ
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 1
ackMessage alice bobId (baseId + 1) Nothing
-- alice sends message
2 <- msgId <$> sendMessage alice bobId (SMP.MsgFlags True) "hey there"
get alice ##> ("", bobId, SENT $ baseId + 2)
void $ messageNotification apnsQ
get bob =##> \case ("", c, Msg "hey there") -> c == aliceId; _ -> False
ackMessage bob aliceId $ baseId + 2
ackMessage bob aliceId (baseId + 2) Nothing
-- no unexpected notifications should follow
noNotification apnsQ
where
@@ -343,7 +343,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do
get bob ##> ("", aliceId, SENT $ baseId + 1)
void $ messageNotification apnsQ
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 1
ackMessage alice bobId (baseId + 1) Nothing
-- set mode to NMPeriodic
NTActive <- registerNtfToken alice tkn NMPeriodic
-- send message, no notification
@@ -352,7 +352,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do
get bob ##> ("", aliceId, SENT $ baseId + 2)
noNotification apnsQ
get alice =##> \case ("", c, Msg "hello again") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 2
ackMessage alice bobId (baseId + 2) Nothing
-- set mode to NMInstant
NTActive <- registerNtfToken alice tkn NMInstant
-- send message, receive notification
@@ -361,7 +361,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do
get bob ##> ("", aliceId, SENT $ baseId + 3)
void $ messageNotification apnsQ
get alice =##> \case ("", c, Msg "hello there") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 3
ackMessage alice bobId (baseId + 3) Nothing
-- turn off notifications
deleteNtfToken alice tkn
-- send message, no notification
@@ -370,7 +370,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do
get bob ##> ("", aliceId, SENT $ baseId + 4)
noNotification apnsQ
get alice =##> \case ("", c, Msg "why hello there") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 4
ackMessage alice bobId (baseId + 4) Nothing
-- turn on notifications, set mode to NMInstant
void $ registerTestToken alice "abcd" NMInstant apnsQ
-- send message, receive notification
@@ -379,7 +379,7 @@ testChangeNotificationsMode APNSMockServer {apnsQ} = do
get bob ##> ("", aliceId, SENT $ baseId + 5)
void $ messageNotification apnsQ
get alice =##> \case ("", c, Msg "hey") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 5
ackMessage alice bobId (baseId + 5) Nothing
-- no notifications should follow
noNotification apnsQ
where
@@ -407,7 +407,7 @@ testChangeToken APNSMockServer {apnsQ} = do
get bob ##> ("", aliceId, SENT $ baseId + 1)
void $ messageNotification apnsQ
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
ackMessage alice bobId $ baseId + 1
ackMessage alice bobId (baseId + 1) Nothing
pure (aliceId, bobId)
disconnectAgentClient alice
@@ -422,7 +422,7 @@ testChangeToken APNSMockServer {apnsQ} = do
get bob ##> ("", aliceId, SENT $ baseId + 2)
void $ messageNotification apnsQ
get alice1 =##> \case ("", c, Msg "hello there") -> c == bobId; _ -> False
ackMessage alice1 bobId $ baseId + 2
ackMessage alice1 bobId (baseId + 2) Nothing
-- no notifications should follow
noNotification apnsQ
where
@@ -441,7 +441,7 @@ testNotificationsStoreLog t APNSMockServer {apnsQ} = do
get bob ##> ("", aliceId, SENT 4)
void $ messageNotification apnsQ
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
ackMessage alice bobId 4
ackMessage alice bobId 4 Nothing
liftIO $ killThread threadId
pure (aliceId, bobId)
@@ -467,7 +467,7 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = do
get bob ##> ("", aliceId, SENT 4)
void $ messageNotification apnsQ
get alice =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
ackMessage alice bobId 4
ackMessage alice bobId 4 Nothing
liftIO $ killThread threadId
pure (aliceId, bobId)
@@ -498,7 +498,7 @@ testNotificationsSMPRestartBatch n t APNSMockServer {apnsQ} = do
get b ##> ("", aliceId, SENT msgId)
void $ messageNotification apnsQ
get a =##> \case ("", c, Msg "hello") -> c == bobId; _ -> False
ackMessage a bobId msgId
ackMessage a bobId msgId Nothing
pure conns
runRight_ @AgentErrorType $ do
@@ -545,7 +545,7 @@ testSwitchNotifications servers APNSMockServer {apnsQ} = do
get b ##> ("", aId, SENT msgId)
void $ messageNotification apnsQ
get a =##> \case ("", c, Msg msg') -> c == bId && msg == msg'; _ -> False
ackMessage a bId msgId
ackMessage a bId msgId Nothing
testMessage "hello"
_ <- switchConnectionAsync a "" bId
switchComplete a bId b aId