ratchet re-synchronization (#774)

* ratchet re-synchronization rfc wip

* additions

* additions, types

* fix tests

* re-sync implementation wip

* re-sync implementation 1st rev.

* test wip

* test passes

* doc

* wording

* improve doc schema

* single agreed state

* refactor (1 state variable)

* allowed -> required

* prohibit enqueue

* enqueue

* send via multiple queues

* test with server offline

* clarify errors

* rename

* more tests

* refactor

* rename AgentRKey

* rename AM_CONN_RATCHET_KEY

* more tests

* rename

* write encoded AgentRatchetInfo to AgentRatchetKey info

* move withConnLock

* refactor qDuplex

* re-create ratchet on receiving second key

* invert condition

* refactor

* simplify

---------

Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
spaced4ndy
2023-06-30 14:17:08 +04:00
committed by GitHub
parent a000419bd7
commit 8be2505fa0
12 changed files with 1365 additions and 408 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -83,6 +83,7 @@ data AgentConfig = AgentConfig
initialCleanupDelay :: Int64,
cleanupInterval :: Int64,
rcvMsgHashesTTL :: NominalDiffTime,
processedRatchetKeyHashesTTL :: NominalDiffTime,
rcvFilesTTL :: NominalDiffTime,
sndFilesTTL :: NominalDiffTime,
xftpNotifyErrsOnRetry :: Bool,
@@ -147,6 +148,7 @@ defaultAgentConfig =
initialCleanupDelay = 30 * 1000000, -- 30 seconds
cleanupInterval = 30 * 60 * 1000000, -- 30 minutes
rcvMsgHashesTTL = 30 * nominalDay,
processedRatchetKeyHashesTTL = 30 * nominalDay,
rcvFilesTTL = 2 * nominalDay,
sndFilesTTL = nominalDay,
xftpNotifyErrsOnRetry = True,

View File

@@ -62,6 +62,7 @@ module Simplex.Messaging.Agent.Protocol
RcvSwitchStatus (..),
SndSwitchStatus (..),
QueueDirection (..),
RatchetSyncState (..),
SMPConfirmation (..),
AgentMsgEnvelope (..),
AgentMessage (..),
@@ -98,6 +99,7 @@ module Simplex.Messaging.Agent.Protocol
BrokerErrorType (..),
SMPAgentError (..),
AgentCryptoError (..),
cryptoErrToSyncState,
ATransmission,
ATransmissionOrError,
ARawTransmission,
@@ -324,6 +326,7 @@ data ACommand (p :: AParty) (e :: AEntity) where
DOWN :: SMPServer -> [ConnId] -> ACommand Agent AENone
UP :: SMPServer -> [ConnId] -> ACommand Agent AENone
SWITCH :: QueueDirection -> SwitchPhase -> ConnectionStats -> ACommand Agent AEConn
RSYNC :: RatchetSyncState -> ConnectionStats -> ACommand Agent AEConn
SEND :: MsgFlags -> MsgBody -> ACommand Client AEConn
MID :: AgentMsgId -> ACommand Agent AEConn
SENT :: AgentMsgId -> ACommand Agent AEConn
@@ -382,6 +385,7 @@ data ACommandTag (p :: AParty) (e :: AEntity) where
DOWN_ :: ACommandTag Agent AENone
UP_ :: ACommandTag Agent AENone
SWITCH_ :: ACommandTag Agent AEConn
RSYNC_ :: ACommandTag Agent AEConn
SEND_ :: ACommandTag Client AEConn
MID_ :: ACommandTag Agent AEConn
SENT_ :: ACommandTag Agent AEConn
@@ -433,6 +437,7 @@ aCommandTag = \case
DOWN {} -> DOWN_
UP {} -> UP_
SWITCH {} -> SWITCH_
RSYNC {} -> RSYNC_
SEND {} -> SEND_
MID _ -> MID_
SENT _ -> SENT_
@@ -559,6 +564,41 @@ instance ToJSON SndSwitchStatus where
instance FromJSON SndSwitchStatus where
parseJSON = strParseJSON "SndSwitchStatus"
data RatchetSyncState
= RSOk
| RSAllowed
| RSRequired
| RSStarted
| RSAgreed
deriving (Eq, Show)
instance StrEncoding RatchetSyncState where
strEncode = \case
RSOk -> "ok"
RSAllowed -> "allowed"
RSRequired -> "required"
RSStarted -> "started"
RSAgreed -> "agreed"
strP =
A.takeTill (== ' ') >>= \case
"ok" -> pure RSOk
"allowed" -> pure RSAllowed
"required" -> pure RSRequired
"started" -> pure RSStarted
"agreed" -> pure RSAgreed
_ -> fail "bad RatchetSyncState"
instance FromField RatchetSyncState where fromField = fromTextField_ $ eitherToMaybe . strDecode . encodeUtf8
instance ToField RatchetSyncState where toField = toField . decodeLatin1 . strEncode
instance ToJSON RatchetSyncState where
toEncoding = strToJEncoding
toJSON = strToJSON
instance FromJSON RatchetSyncState where
parseJSON = strParseJSON "RatchetSyncState"
data RcvQueueInfo = RcvQueueInfo
{ rcvServer :: SMPServer,
rcvSwitchStatus :: Maybe RcvSwitchStatus,
@@ -597,17 +637,21 @@ instance StrEncoding SndQueueInfo where
data ConnectionStats = ConnectionStats
{ rcvQueuesInfo :: [RcvQueueInfo],
sndQueuesInfo :: [SndQueueInfo]
sndQueuesInfo :: [SndQueueInfo],
ratchetSyncState :: RatchetSyncState
}
deriving (Eq, Show, Generic)
instance StrEncoding ConnectionStats where
strEncode ConnectionStats {rcvQueuesInfo, sndQueuesInfo} =
"rcv=" <> strEncodeList rcvQueuesInfo <> " snd=" <> strEncodeList sndQueuesInfo
strEncode ConnectionStats {rcvQueuesInfo, sndQueuesInfo, ratchetSyncState} =
"rcv=" <> strEncodeList rcvQueuesInfo
<> (" snd=" <> strEncodeList sndQueuesInfo)
<> (" sync=" <> strEncode ratchetSyncState)
strP = do
rcvQueuesInfo <- "rcv=" *> strListP
sndQueuesInfo <- " snd=" *> strListP
pure ConnectionStats {rcvQueuesInfo, sndQueuesInfo}
ratchetSyncState <- " sync=" *> strP
pure ConnectionStats {rcvQueuesInfo, sndQueuesInfo, ratchetSyncState}
instance ToJSON ConnectionStats where toEncoding = J.genericToEncoding J.defaultOptions
@@ -710,7 +754,7 @@ data SMPConfirmation = SMPConfirmation
data AgentMsgEnvelope
= AgentConfirmation
{ agentVersion :: Version,
e2eEncryption :: Maybe (E2ERatchetParams 'C.X448),
e2eEncryption_ :: Maybe (E2ERatchetParams 'C.X448),
encConnInfo :: ByteString
}
| AgentMsgEnvelope
@@ -722,22 +766,29 @@ data AgentMsgEnvelope
connReq :: ConnectionRequestUri 'CMInvitation,
connInfo :: ByteString -- this message is only encrypted with per-queue E2E, not with double ratchet,
}
| AgentRatchetKey
{ agentVersion :: Version,
e2eEncryption :: E2ERatchetParams 'C.X448,
info :: ByteString
}
deriving (Show)
instance Encoding AgentMsgEnvelope where
smpEncode = \case
AgentConfirmation {agentVersion, e2eEncryption, encConnInfo} ->
smpEncode (agentVersion, 'C', e2eEncryption, Tail encConnInfo)
AgentConfirmation {agentVersion, e2eEncryption_, encConnInfo} ->
smpEncode (agentVersion, 'C', e2eEncryption_, Tail encConnInfo)
AgentMsgEnvelope {agentVersion, encAgentMessage} ->
smpEncode (agentVersion, 'M', Tail encAgentMessage)
AgentInvitation {agentVersion, connReq, connInfo} ->
smpEncode (agentVersion, 'I', Large $ strEncode connReq, Tail connInfo)
AgentRatchetKey {agentVersion, e2eEncryption, info} ->
smpEncode (agentVersion, 'R', e2eEncryption, Tail info)
smpP = do
agentVersion <- smpP
smpP >>= \case
'C' -> do
(e2eEncryption, Tail encConnInfo) <- smpP
pure AgentConfirmation {agentVersion, e2eEncryption, encConnInfo}
(e2eEncryption_, Tail encConnInfo) <- smpP
pure AgentConfirmation {agentVersion, e2eEncryption_, encConnInfo}
'M' -> do
Tail encAgentMessage <- smpP
pure AgentMsgEnvelope {agentVersion, encAgentMessage}
@@ -745,15 +796,21 @@ instance Encoding AgentMsgEnvelope where
connReq <- strDecode . unLarge <$?> smpP
Tail connInfo <- smpP
pure AgentInvitation {agentVersion, connReq, connInfo}
'R' -> do
e2eEncryption <- smpP
Tail info <- smpP
pure AgentRatchetKey {agentVersion, e2eEncryption, info}
_ -> fail "bad AgentMsgEnvelope"
-- SMP agent message formats (after double ratchet decryption,
-- or in case of AgentInvitation - in plain text body)
-- AgentRatchetInfo is not encrypted with double ratchet, but with per-queue E2E encryption
data AgentMessage
= AgentConnInfo ConnInfo
| -- AgentConnInfoReply is only used in duplexHandshake mode (v2), allowing to include reply queue(s) in the initial confirmation.
-- It makes REPLY message unnecessary.
AgentConnInfoReply (L.NonEmpty SMPQueueInfo) ConnInfo
| AgentRatchetInfo ByteString
| AgentMessage APrivHeader AMessage
deriving (Show)
@@ -761,17 +818,20 @@ instance Encoding AgentMessage where
smpEncode = \case
AgentConnInfo cInfo -> smpEncode ('I', Tail cInfo)
AgentConnInfoReply smpQueues cInfo -> smpEncode ('D', smpQueues, Tail cInfo) -- 'D' stands for "duplex"
AgentRatchetInfo info -> smpEncode ('R', Tail info)
AgentMessage hdr aMsg -> smpEncode ('M', hdr, aMsg)
smpP =
smpP >>= \case
'I' -> AgentConnInfo . unTail <$> smpP
'D' -> AgentConnInfoReply <$> smpP <*> (unTail <$> smpP)
'R' -> AgentRatchetInfo . unTail <$> smpP
'M' -> AgentMessage <$> smpP <*> smpP
_ -> fail "bad AgentMessage"
data AgentMessageType
= AM_CONN_INFO
| AM_CONN_INFO_REPLY
| AM_RATCHET_INFO
| AM_HELLO_
| AM_REPLY_
| AM_A_MSG_
@@ -780,12 +840,14 @@ data AgentMessageType
| AM_QKEY_
| AM_QUSE_
| AM_QTEST_
| AM_EREADY_
deriving (Eq, Show)
instance Encoding AgentMessageType where
smpEncode = \case
AM_CONN_INFO -> "C"
AM_CONN_INFO_REPLY -> "D"
AM_RATCHET_INFO -> "S"
AM_HELLO_ -> "H"
AM_REPLY_ -> "R"
AM_A_MSG_ -> "M"
@@ -794,10 +856,12 @@ instance Encoding AgentMessageType where
AM_QKEY_ -> "QK"
AM_QUSE_ -> "QU"
AM_QTEST_ -> "QT"
AM_EREADY_ -> "E"
smpP =
A.anyChar >>= \case
'C' -> pure AM_CONN_INFO
'D' -> pure AM_CONN_INFO_REPLY
'S' -> pure AM_RATCHET_INFO
'H' -> pure AM_HELLO_
'R' -> pure AM_REPLY_
'M' -> pure AM_A_MSG_
@@ -809,12 +873,14 @@ instance Encoding AgentMessageType where
'U' -> pure AM_QUSE_
'T' -> pure AM_QTEST_
_ -> fail "bad AgentMessageType"
'E' -> pure AM_EREADY_
_ -> fail "bad AgentMessageType"
agentMessageType :: AgentMessage -> AgentMessageType
agentMessageType = \case
AgentConnInfo _ -> AM_CONN_INFO
AgentConnInfoReply {} -> AM_CONN_INFO_REPLY
AgentRatchetInfo _ -> AM_RATCHET_INFO
AgentMessage _ aMsg -> case aMsg of
-- HELLO is used both in v1 and in v2, but differently.
-- - in v1 (and, possibly, in v2 for simplex connections) can be sent multiple times,
@@ -829,6 +895,7 @@ agentMessageType = \case
QKEY _ -> AM_QKEY_
QUSE _ -> AM_QUSE_
QTEST _ -> AM_QTEST_
EREADY _ -> AM_EREADY_
data APrivHeader = APrivHeader
{ -- | sequential ID assigned by the sending agent
@@ -852,6 +919,7 @@ data AMsgType
| QKEY_
| QUSE_
| QTEST_
| EREADY_
deriving (Eq)
instance Encoding AMsgType where
@@ -864,6 +932,7 @@ instance Encoding AMsgType where
QKEY_ -> "QK"
QUSE_ -> "QU"
QTEST_ -> "QT"
EREADY_ -> "E"
smpP =
A.anyChar >>= \case
'H' -> pure HELLO_
@@ -877,6 +946,7 @@ instance Encoding AMsgType where
'U' -> pure QUSE_
'T' -> pure QTEST_
_ -> fail "bad AMsgType"
'E' -> pure EREADY_
_ -> fail "bad AMsgType"
-- | Messages sent between SMP agents once SMP queue is secured.
@@ -899,6 +969,8 @@ data AMessage
QUSE (L.NonEmpty (SndQAddr, Bool))
| -- sent by the sender to test new queues and to complete switching
QTEST (L.NonEmpty SndQAddr)
| -- ratchet re-synchronization is complete, with last decrypted sender message id (recipient's `last_external_snd_msg_id`)
EREADY Int64
deriving (Show)
type SndQAddr = (SMPServer, SMP.SenderId)
@@ -913,6 +985,7 @@ instance Encoding AMessage where
QKEY qs -> smpEncode (QKEY_, qs)
QUSE qs -> smpEncode (QUSE_, qs)
QTEST qs -> smpEncode (QTEST_, qs)
EREADY lastDecryptedMsgId -> smpEncode (EREADY_, lastDecryptedMsgId)
smpP =
smpP
>>= \case
@@ -924,6 +997,7 @@ instance Encoding AMessage where
QKEY_ -> QKEY <$> smpP
QUSE_ -> QUSE <$> smpP
QTEST_ -> QTEST <$> smpP
EREADY_ -> EREADY <$> smpP
instance forall m. ConnectionModeI m => StrEncoding (ConnectionRequestUri m) where
strEncode = \case
@@ -1419,6 +1493,14 @@ instance Arbitrary SMPAgentError where arbitrary = genericArbitraryU
instance Arbitrary AgentCryptoError where arbitrary = genericArbitraryU
cryptoErrToSyncState :: AgentCryptoError -> RatchetSyncState
cryptoErrToSyncState = \case
DECRYPT_AES -> RSAllowed
DECRYPT_CB -> RSAllowed
RATCHET_HEADER -> RSRequired
RATCHET_EARLIER _ -> RSAllowed
RATCHET_SKIPPED _ -> RSRequired
-- | SMP agent command and response parser for commands passed via network (only parses binary length)
networkCommandP :: Parser ACmd
networkCommandP = commandP A.takeByteString
@@ -1448,6 +1530,7 @@ instance StrEncoding ACmdTag where
"DOWN" -> nt DOWN_
"UP" -> nt UP_
"SWITCH" -> ct SWITCH_
"RSYNC" -> ct RSYNC_
"SEND" -> t SEND_
"MID" -> ct MID_
"SENT" -> ct SENT_
@@ -1501,6 +1584,7 @@ instance (APartyI p, AEntityI e) => StrEncoding (ACommandTag p e) where
DOWN_ -> "DOWN"
UP_ -> "UP"
SWITCH_ -> "SWITCH"
RSYNC_ -> "RSYNC"
SEND_ -> "SEND"
MID_ -> "MID"
SENT_ -> "SENT"
@@ -1568,6 +1652,7 @@ commandP binaryP =
DOWN_ -> s (DOWN <$> strP_ <*> connections)
UP_ -> s (UP <$> strP_ <*> connections)
SWITCH_ -> s (SWITCH <$> strP_ <*> strP_ <*> strP)
RSYNC_ -> s (RSYNC <$> strP_ <*> strP)
MID_ -> s (MID <$> A.decimal)
SENT_ -> s (SENT <$> A.decimal)
MERR_ -> s (MERR <$> A.decimal <* A.space <*> strP)
@@ -1626,6 +1711,7 @@ serializeCommand = \case
DOWN srv conns -> B.unwords [s DOWN_, s srv, connections conns]
UP srv conns -> B.unwords [s UP_, s srv, connections conns]
SWITCH dir phase srvs -> s (SWITCH_, dir, phase, srvs)
RSYNC rrState cstats -> s (RSYNC_, rrState, cstats)
SEND msgFlags msgBody -> B.unwords [s SEND_, smpEncode msgFlags, serializeBinary msgBody]
MID mId -> s (MID_, Str $ bshow mId)
SENT mId -> s (SENT_, Str $ bshow mId)

View File

@@ -293,7 +293,9 @@ data ConnData = ConnData
connAgentVersion :: Version,
enableNtfs :: Bool,
duplexHandshake :: Maybe Bool, -- added in agent protocol v2
deleted :: Bool
lastExternalSndId :: PrevExternalSndId,
deleted :: Bool,
ratchetSyncState :: RatchetSyncState
}
deriving (Eq, Show)

View File

@@ -53,6 +53,10 @@ module Simplex.Messaging.Agent.Store.SQLite
getConnData,
setConnDeleted,
getDeletedConnIds,
setConnRatchetSync,
addProcessedRatchetKeyHash,
checkProcessedRatchetKeyHashExists,
deleteProcessedRatchetKeyHashesExpired,
getRcvConn,
getRcvQueueById,
getSndQueueById,
@@ -107,7 +111,11 @@ module Simplex.Messaging.Agent.Store.SQLite
-- Double ratchet persistence
createRatchetX3dhKeys,
getRatchetX3dhKeys,
createRatchetX3dhKeys',
getRatchetX3dhKeys',
setRatchetX3dhKeys,
createRatchet,
deleteRatchet,
getRatchet,
getSkippedMsgKeys,
updateRatchet,
@@ -1029,6 +1037,35 @@ getRatchetX3dhKeys db connId =
Right (Just k1, Just k2) -> Right (k1, k2)
_ -> Left SEX3dhKeysNotFound
createRatchetX3dhKeys' :: DB.Connection -> ConnId -> C.PrivateKeyX448 -> C.PrivateKeyX448 -> C.PublicKeyX448 -> C.PublicKeyX448 -> IO ()
createRatchetX3dhKeys' db connId x3dhPrivKey1 x3dhPrivKey2 x3dhPubKey1 x3dhPubKey2 =
DB.execute
db
"INSERT INTO ratchets (conn_id, x3dh_priv_key_1, x3dh_priv_key_2, x3dh_pub_key_1, x3dh_pub_key_2) VALUES (?,?,?,?,?)"
(connId, x3dhPrivKey1, x3dhPrivKey2, x3dhPubKey1, x3dhPubKey2)
getRatchetX3dhKeys' :: DB.Connection -> ConnId -> IO (Either StoreError (C.PrivateKeyX448, C.PrivateKeyX448, C.PublicKeyX448, C.PublicKeyX448))
getRatchetX3dhKeys' db connId =
fmap hasKeys $
firstRow id SEX3dhKeysNotFound $
DB.query db "SELECT x3dh_priv_key_1, x3dh_priv_key_2, x3dh_pub_key_1, x3dh_pub_key_2 FROM ratchets WHERE conn_id = ?" (Only connId)
where
hasKeys = \case
Right (Just pk1, Just pk2, Just k1, Just k2) -> Right (pk1, pk2, k1, k2)
_ -> Left SEX3dhKeysNotFound
-- used to remember new keys when starting ratchet re-synchronization
setRatchetX3dhKeys :: DB.Connection -> ConnId -> C.PrivateKeyX448 -> C.PrivateKeyX448 -> C.PublicKeyX448 -> C.PublicKeyX448 -> IO ()
setRatchetX3dhKeys db connId x3dhPrivKey1 x3dhPrivKey2 x3dhPubKey1 x3dhPubKey2 =
DB.execute
db
[sql|
UPDATE ratchets
SET x3dh_priv_key_1 = ?, x3dh_priv_key_2 = ?, x3dh_pub_key_1 = ?, x3dh_pub_key_2 = ?
WHERE conn_id = ?
|]
(x3dhPrivKey1, x3dhPrivKey2, x3dhPubKey1, x3dhPubKey2, connId)
createRatchet :: DB.Connection -> ConnId -> RatchetX448 -> IO ()
createRatchet db connId rc =
DB.executeNamed
@@ -1039,10 +1076,16 @@ createRatchet db connId rc =
ON CONFLICT (conn_id) DO UPDATE SET
ratchet_state = :ratchet_state,
x3dh_priv_key_1 = NULL,
x3dh_priv_key_2 = NULL
x3dh_priv_key_2 = NULL,
x3dh_pub_key_1 = NULL,
x3dh_pub_key_2 = NULL
|]
[":conn_id" := connId, ":ratchet_state" := rc]
deleteRatchet :: DB.Connection -> ConnId -> IO ()
deleteRatchet db connId =
DB.execute db "DELETE FROM ratchets WHERE conn_id = ?" (Only connId)
getRatchet :: DB.Connection -> ConnId -> IO (Either StoreError RatchetX448)
getRatchet db connId =
firstRow' ratchet SERatchetNotFound $ DB.query db "SELECT ratchet_state FROM ratchets WHERE conn_id = ?" (Only connId)
@@ -1643,10 +1686,21 @@ getAnyConns_ deleted' db connIds = forM connIds $ E.handle handleDBError . getAn
handleDBError = pure . Left . SEInternal . bshow
getConnData :: DB.Connection -> ConnId -> IO (Maybe (ConnData, ConnectionMode))
getConnData dbConn connId' =
maybeFirstRow cData $ DB.query dbConn "SELECT user_id, conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake, deleted FROM connections WHERE conn_id = ?;" (Only connId')
getConnData db connId' =
maybeFirstRow cData $
DB.query
db
[sql|
SELECT
user_id, conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake,
last_external_snd_msg_id, deleted, ratchet_sync_state
FROM connections
WHERE conn_id = ?
|]
(Only connId')
where
cData (userId, connId, cMode, connAgentVersion, enableNtfs_, duplexHandshake, deleted) = (ConnData {userId, connId, connAgentVersion, enableNtfs = fromMaybe True enableNtfs_, duplexHandshake, deleted}, cMode)
cData (userId, connId, cMode, connAgentVersion, enableNtfs_, duplexHandshake, lastExternalSndId, deleted, ratchetSyncState) =
(ConnData {userId, connId, connAgentVersion, enableNtfs = fromMaybe True enableNtfs_, duplexHandshake, lastExternalSndId, deleted, ratchetSyncState}, cMode)
setConnDeleted :: DB.Connection -> ConnId -> IO ()
setConnDeleted db connId = DB.execute db "UPDATE connections SET deleted = ? WHERE conn_id = ?" (True, connId)
@@ -1654,6 +1708,30 @@ setConnDeleted db connId = DB.execute db "UPDATE connections SET deleted = ? WHE
getDeletedConnIds :: DB.Connection -> IO [ConnId]
getDeletedConnIds db = map fromOnly <$> DB.query db "SELECT conn_id FROM connections WHERE deleted = ?" (Only True)
setConnRatchetSync :: DB.Connection -> ConnId -> RatchetSyncState -> IO ()
setConnRatchetSync db connId ratchetSyncState =
DB.execute db "UPDATE connections SET ratchet_sync_state = ? WHERE conn_id = ?" (ratchetSyncState, connId)
addProcessedRatchetKeyHash :: DB.Connection -> ConnId -> ByteString -> IO ()
addProcessedRatchetKeyHash db connId hash =
DB.execute db "INSERT INTO processed_ratchet_key_hashes (conn_id, hash) VALUES (?,?)" (connId, hash)
checkProcessedRatchetKeyHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool
checkProcessedRatchetKeyHashExists db connId hash = do
fromMaybe False
<$> maybeFirstRow
fromOnly
( DB.query
db
"SELECT 1 FROM processed_ratchet_key_hashes WHERE conn_id = ? AND hash = ? LIMIT 1"
(connId, hash)
)
deleteProcessedRatchetKeyHashesExpired :: DB.Connection -> NominalDiffTime -> IO ()
deleteProcessedRatchetKeyHashesExpired db ttl = do
cutoffTs <- addUTCTime (- ttl) <$> getCurrentTime
DB.execute db "DELETE FROM processed_ratchet_key_hashes WHERE created_at < ?" (Only cutoffTs)
-- | returns all connection queues, the first queue is the primary one
getRcvQueuesByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueue))
getRcvQueuesByConnId_ db connId =

View File

@@ -61,6 +61,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230401_snd_files
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230510_files_pending_replicas_indexes
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230516_encrypted_rcv_message_hashes
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230531_switch_status
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230615_ratchet_sync
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON)
import Simplex.Messaging.Transport.Client (TransportHost)
@@ -88,7 +89,8 @@ schemaMigrations =
("m20230401_snd_files", m20230401_snd_files, Just down_m20230401_snd_files),
("m20230510_files_pending_replicas_indexes", m20230510_files_pending_replicas_indexes, Just down_m20230510_files_pending_replicas_indexes),
("m20230516_encrypted_rcv_message_hashes", m20230516_encrypted_rcv_message_hashes, Just down_m20230516_encrypted_rcv_message_hashes),
("m20230531_switch_status", m20230531_switch_status, Just down_m20230531_switch_status)
("m20230531_switch_status", m20230531_switch_status, Just down_m20230531_switch_status),
("m20230615_ratchet_sync", m20230615_ratchet_sync, Just down_m20230615_ratchet_sync)
]
-- | The list of migrations in ascending order by date

View File

@@ -0,0 +1,41 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230615_ratchet_sync where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
-- Ratchet public keys are saved when ratchet re-synchronization is started - upon receiving other party's public keys,
-- keys are compared to determine ratchet initialization ordering for both parties.
-- This solves a possible race when both parties start ratchet re-synchronization at the same time.
m20230615_ratchet_sync :: Query
m20230615_ratchet_sync =
[sql|
ALTER TABLE connections ADD COLUMN ratchet_sync_state TEXT NOT NULL DEFAULT 'ok';
ALTER TABLE ratchets ADD COLUMN x3dh_pub_key_1 BLOB;
ALTER TABLE ratchets ADD COLUMN x3dh_pub_key_2 BLOB;
CREATE TABLE processed_ratchet_key_hashes(
processed_ratchet_key_hash_id INTEGER PRIMARY KEY,
conn_id BLOB NOT NULL REFERENCES connections ON DELETE CASCADE,
hash BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_processed_ratchet_key_hashes_hash ON processed_ratchet_key_hashes(conn_id, hash);
|]
down_m20230615_ratchet_sync :: Query
down_m20230615_ratchet_sync =
[sql|
DROP INDEX idx_processed_ratchet_key_hashes_hash;
DROP TABLE processed_ratchet_key_hashes;
ALTER TABLE ratchets DROP COLUMN x3dh_pub_key_2;
ALTER TABLE ratchets DROP COLUMN x3dh_pub_key_1;
ALTER TABLE connections DROP COLUMN ratchet_sync_state;
|]

View File

@@ -25,7 +25,8 @@ CREATE TABLE connections(
enable_ntfs INTEGER,
deleted INTEGER DEFAULT 0 CHECK(deleted NOT NULL),
user_id INTEGER CHECK(user_id NOT NULL)
REFERENCES users ON DELETE CASCADE
REFERENCES users ON DELETE CASCADE,
ratchet_sync_state TEXT NOT NULL DEFAULT 'ok'
) WITHOUT ROWID;
CREATE TABLE rcv_queues(
host TEXT NOT NULL,
@@ -154,6 +155,9 @@ CREATE TABLE ratchets(
-- ratchet is initially empty on the receiving side(the side offering the connection)
ratchet_state BLOB,
e2e_version INTEGER NOT NULL DEFAULT 1
,
x3dh_pub_key_1 BLOB,
x3dh_pub_key_2 BLOB
) WITHOUT ROWID;
CREATE TABLE skipped_messages(
skipped_message_id INTEGER PRIMARY KEY,
@@ -356,6 +360,13 @@ CREATE TABLE encrypted_rcv_message_hashes(
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE TABLE processed_ratchet_key_hashes(
processed_ratchet_key_hash_id INTEGER PRIMARY KEY,
conn_id BLOB NOT NULL REFERENCES connections ON DELETE CASCADE,
hash BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
CREATE UNIQUE INDEX idx_rcv_queues_ntf ON rcv_queues(host, port, ntf_id);
CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(conn_id, rcv_queue_id);
CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(conn_id, snd_queue_id);
@@ -446,3 +457,7 @@ CREATE INDEX idx_encrypted_rcv_message_hashes_hash ON encrypted_rcv_message_hash
conn_id,
hash
);
CREATE INDEX idx_processed_ratchet_key_hashes_hash ON processed_ratchet_key_hashes(
conn_id,
hash
);