agent: use strict tables (#1686)

* agent: use strict tables

* migrate existing tables to strict

* test: verify that all tables are strict

* fix column types for device_token and ntf_mode

* fix encodings and column types for ntf_sub_action and ntf_sub_smp_action

* update schema

* remove debug.trace

* log
This commit is contained in:
Evgeny
2026-01-03 17:19:18 +00:00
committed by GitHub
parent d6df769799
commit a7b43b1a3e
17 changed files with 269 additions and 123 deletions

View File

@@ -240,6 +240,7 @@ import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.AgentStore (getClientNotices, updateClientNotices)
import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction)
import Simplex.Messaging.Agent.Store.DB (SQLError)
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Agent.Store.Entity
import Simplex.Messaging.Agent.TSessionSubs (TSessionSubs)
@@ -2124,7 +2125,9 @@ withWork_ c doWork getWork action =
| otherwise -> notifyErr INTERNAL e
where
noWork = liftIO $ noWorkToDo doWork
notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
notifyErr err e = do
logError $ "withWork_ error: " <> tshow e
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
withWorkItems :: (AnyStoreError e', MonadIO m) => AgentClient -> TMVar () -> ExceptT e m (Either e' [Either e' a]) -> (NonEmpty a -> ExceptT e m ()) -> ExceptT e m ()
withWorkItems c doWork getWork action = do
@@ -2145,7 +2148,9 @@ withWorkItems c doWork getWork action = do
| otherwise -> notifyErr INTERNAL e
where
noWork = liftIO $ noWorkToDo doWork
notifyErr err e = atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
notifyErr err e = do
logError $ "withWorkItems error: " <> tshow e
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ err $ show e)
noWorkToDo :: TMVar () -> IO ()
noWorkToDo = void . atomically . tryTakeTMVar
@@ -2243,24 +2248,19 @@ withStore :: AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> AM a
withStore c action = do
st <- asks store
withExceptT storeError . ExceptT . liftIO . agentOperationBracket c AODatabase (\_ -> pure ()) $
withTransaction st action `E.catches` handleDBErrors
withTransaction st action `E.catch` handleDBErrors
where
handleDBErrors :: E.SomeException -> IO (Either StoreError a)
handleDBErrors e = pure $ Left $ case E.fromException e of
Just (e' :: SQLError) ->
#if defined(dbPostgres)
-- TODO [postgres] postgres specific error handling
handleDBErrors :: [E.Handler IO (Either StoreError a)]
handleDBErrors =
[ E.Handler $ \(E.SomeException e) -> pure . Left $ SEInternal $ bshow e
]
SEInternal $ bshow e'
#else
handleDBErrors :: [E.Handler IO (Either StoreError a)]
handleDBErrors =
[ E.Handler $ \(e :: SQL.SQLError) ->
let se = SQL.sqlError e
busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked
in pure . Left . (if busy then SEDatabaseBusy else SEInternal) $ bshow se,
E.Handler $ \(E.SomeException e) -> pure . Left $ SEInternal $ bshow e
]
let se = SQL.sqlError e'
busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked
in (if busy then SEDatabaseBusy else SEInternal) $ bshow e'
#endif
Nothing -> SEInternal $ bshow e
unsafeWithStore :: AgentClient -> (DB.Connection -> IO a) -> AM' a
unsafeWithStore c action = do

View File

@@ -36,6 +36,7 @@ import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import qualified Data.Set as S
import qualified Data.Text as T
import Data.Time (UTCTime, addUTCTime, getCurrentTime)
import Data.Time.Clock (diffUTCTime)
import Simplex.Messaging.Agent.Client
@@ -46,13 +47,13 @@ import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.AgentStore
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Client (NetworkRequestMode (..))
import Simplex.Messaging.Client (NetworkRequestMode (..), nonBlockingWriteTBQueue)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Protocol (NtfServer, sameSrvAddr)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.Util (catchAllErrors, diffToMicroseconds, threadDelay', tryAllErrors, tshow, whenM)
import Simplex.Messaging.Util (catchAllErrors, catchAllErrors', diffToMicroseconds, threadDelay', tryAllErrors, tshow, whenM)
import System.Random (randomR)
import UnliftIO
import UnliftIO.Concurrent (forkIO)
@@ -66,19 +67,15 @@ runNtfSupervisor c = do
Right _ -> pure ()
forever $ do
cmd <- atomically . readTBQueue $ ntfSubQ ns
handleErr . agentOperationBracket c AONtfNetwork waitUntilActive $
runExceptT (processNtfCmd c cmd) >>= \case
Left e -> notifyErr e
Right _ -> return ()
handleErr $ agentOperationBracket c AONtfNetwork waitUntilActive $
processNtfCmd c cmd `catchAllErrors'` notifyErr
where
startTknDelete :: AM ()
startTknDelete = do
pendingDelServers <- withStore' c getPendingDelTknServers
lift . forM_ pendingDelServers $ getNtfTknDelWorker True c
handleErr :: AM' () -> AM' ()
handleErr = E.handle $ \(e :: E.SomeException) -> do
logError $ "runNtfSupervisor error " <> tshow e
notifyErr e
handleErr = E.handle $ \(e :: E.SomeException) -> notifyErr e
notifyErr e = notifyInternalError' c $ "runNtfSupervisor error " <> show e
partitionErrs :: (a -> ConnId) -> [a] -> [Either AgentErrorType b] -> ([(ConnId, AgentErrorType)], [b])
@@ -505,16 +502,18 @@ workerInternalError c connId internalErrStr = do
-- TODO change error
notifyInternalError :: MonadIO m => AgentClient -> ConnId -> String -> m ()
notifyInternalError AgentClient {subQ} connId internalErrStr = atomically $ writeTBQueue subQ ("", connId, AEvt SAEConn $ ERR $ INTERNAL internalErrStr)
{-# INLINE notifyInternalError #-}
notifyInternalError AgentClient {subQ} connId internalErrStr = do
logError $ T.pack internalErrStr
liftIO $ nonBlockingWriteTBQueue subQ ("", connId, AEvt SAEConn $ ERR $ INTERNAL internalErrStr)
notifyInternalError' :: MonadIO m => AgentClient -> String -> m ()
notifyInternalError' AgentClient {subQ} internalErrStr = atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR $ INTERNAL internalErrStr)
notifyInternalError' c = notifyInternalError c ""
{-# INLINE notifyInternalError' #-}
notifyErrs :: MonadIO m => AgentClient -> [(ConnId, AgentErrorType)] -> m ()
notifyErrs c = mapM_ (notifySub c . ERRS) . L.nonEmpty
{-# INLINE notifyErrs #-}
notifyErrs c errs_ = forM_ (L.nonEmpty errs_) $ \errs -> do
logError $ "notifyErrs: " <> tshow errs
notifySub c $ ERRS errs
getNtfToken :: AM' (Maybe NtfToken)
getNtfToken = do

View File

@@ -698,9 +698,9 @@ instance ToJSON NotificationsMode where
instance FromJSON NotificationsMode where
parseJSON = strParseJSON "NotificationsMode"
instance ToField NotificationsMode where toField = toField . strEncode
instance ToField NotificationsMode where toField = toField . decodeLatin1 . strEncode
instance FromField NotificationsMode where fromField = blobFieldDecoder $ parseAll strP
instance FromField NotificationsMode where fromField = fromTextField_ $ eitherToMaybe . strDecode . encodeUtf8
data NotificationInfo = NotificationInfo
{ ntfConnId :: ConnId,

View File

@@ -16,7 +16,7 @@
module Simplex.Messaging.Agent.Store where
import Control.Exception (Exception)
import Control.Exception (Exception (..))
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.ByteString.Char8 (ByteString)
import Data.Int (Int64)
@@ -31,6 +31,7 @@ import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval (RI2State)
import Simplex.Messaging.Agent.Store.Entity
import Simplex.Messaging.Agent.Store.Common
import Simplex.Messaging.Agent.Store.DB (SQLError)
import Simplex.Messaging.Agent.Store.Interface (createDBStore)
import Simplex.Messaging.Agent.Store.Migrations.App (appMigrations)
import Simplex.Messaging.Agent.Store.Shared (MigrationConfig (..), MigrationError (..))
@@ -752,7 +753,9 @@ data StoreError
deriving (Eq, Show, Exception)
instance AnyError StoreError where
fromSomeException = SEInternal . bshow
fromSomeException e = SEInternal $ case fromException e of
Just (e' :: SQLError) -> bshow e'
Nothing -> bshow e
class (Show e, AnyError e) => AnyStoreError e where
isWorkItemError :: e -> Bool

View File

@@ -285,7 +285,7 @@ import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.Common
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Agent.Store.DB (Binary (..), BoolInt (..), FromField (..), ToField (..), blobFieldDecoder, fromTextField_)
import Simplex.Messaging.Agent.Store.DB (Binary (..), BoolInt (..), FromField (..), ToField (..), SQLError, blobFieldDecoder, fromTextField_)
import Simplex.Messaging.Agent.Store.Entity
import Simplex.Messaging.Client (SMPTransportSession)
import qualified Simplex.Messaging.Crypto as C
@@ -308,11 +308,11 @@ import qualified UnliftIO.Exception as E
import UnliftIO.STM
#if defined(dbPostgres)
import Data.List (sortOn)
import Database.PostgreSQL.Simple (In (..), Only (..), Query, SqlError, (:.) (..))
import Database.PostgreSQL.Simple (In (..), Only (..), Query, (:.) (..))
import Database.PostgreSQL.Simple.Errors (constraintViolation)
import Database.PostgreSQL.Simple.SqlQQ (sql)
#else
import Database.SQLite.Simple (FromRow (..), Only (..), Query (..), SQLError, ToRow (..), field, (:.) (..))
import Database.SQLite.Simple (FromRow (..), Only (..), Query (..), ToRow (..), field, (:.) (..))
import qualified Database.SQLite.Simple as SQL
import Database.SQLite.Simple.QQ (sql)
#endif
@@ -320,13 +320,12 @@ import Database.SQLite.Simple.QQ (sql)
checkConstraint :: StoreError -> IO (Either StoreError a) -> IO (Either StoreError a)
checkConstraint err action = action `E.catch` (pure . Left . handleSQLError err)
handleSQLError :: StoreError -> SQLError -> StoreError
#if defined(dbPostgres)
handleSQLError :: StoreError -> SqlError -> StoreError
handleSQLError err e = case constraintViolation e of
Just _ -> err
Nothing -> SEInternal $ bshow e
#else
handleSQLError :: StoreError -> SQLError -> StoreError
handleSQLError err e
| SQL.sqlError e == SQL.ErrorConstraint = err
| otherwise = SEInternal $ bshow e
@@ -1428,7 +1427,7 @@ createNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer
INSERT INTO ntf_tokens
(provider, device_token, ntf_host, ntf_port, tkn_id, tkn_pub_key, tkn_priv_key, tkn_pub_dh_key, tkn_priv_dh_key, tkn_dh_secret, tkn_status, tkn_action, ntf_mode) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|]
((provider, token, host, port, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode))
((provider, Binary token, host, port, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode))
getSavedNtfToken :: DB.Connection -> IO (Maybe NtfToken)
getSavedNtfToken db = do
@@ -1443,7 +1442,7 @@ getSavedNtfToken db = do
JOIN ntf_servers s USING (ntf_host, ntf_port)
|]
where
ntfToken ((host, port, keyHash) :. (provider, dt, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode_)) =
ntfToken ((host, port, keyHash) :. (provider, Binary dt, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode_)) =
let ntfServer = NtfServer host port keyHash
ntfDhKeys = (ntfDhPubKey, ntfDhPrivKey)
ntfMode = fromMaybe NMPeriodic ntfMode_
@@ -1459,7 +1458,7 @@ updateNtfTokenRegistration db NtfToken {deviceToken = DeviceToken provider token
SET tkn_id = ?, tkn_dh_secret = ?, tkn_status = ?, tkn_action = ?, updated_at = ?
WHERE provider = ? AND device_token = ? AND ntf_host = ? AND ntf_port = ?
|]
(tknId, ntfDhSecret, NTRegistered, Nothing :: Maybe NtfTknAction, updatedAt, provider, token, host, port)
(tknId, ntfDhSecret, NTRegistered, Nothing :: Maybe NtfTknAction, updatedAt, provider, Binary token, host, port)
updateDeviceToken :: DB.Connection -> NtfToken -> DeviceToken -> IO ()
updateDeviceToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} (DeviceToken toProvider toToken) = do
@@ -1471,7 +1470,7 @@ updateDeviceToken db NtfToken {deviceToken = DeviceToken provider token, ntfServ
SET provider = ?, device_token = ?, tkn_status = ?, tkn_action = ?, updated_at = ?
WHERE provider = ? AND device_token = ? AND ntf_host = ? AND ntf_port = ?
|]
(toProvider, toToken, NTRegistered, Nothing :: Maybe NtfTknAction, updatedAt, provider, token, host, port)
(toProvider, Binary toToken, NTRegistered, Nothing :: Maybe NtfTknAction, updatedAt, provider, Binary token, host, port)
updateNtfMode :: DB.Connection -> NtfToken -> NotificationsMode -> IO ()
updateNtfMode db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} ntfMode = do
@@ -1483,7 +1482,7 @@ updateNtfMode db NtfToken {deviceToken = DeviceToken provider token, ntfServer =
SET ntf_mode = ?, updated_at = ?
WHERE provider = ? AND device_token = ? AND ntf_host = ? AND ntf_port = ?
|]
(ntfMode, updatedAt, provider, token, host, port)
(ntfMode, updatedAt, provider, Binary token, host, port)
updateNtfToken :: DB.Connection -> NtfToken -> NtfTknStatus -> Maybe NtfTknAction -> IO ()
updateNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} tknStatus tknAction = do
@@ -1495,7 +1494,7 @@ updateNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer
SET tkn_status = ?, tkn_action = ?, updated_at = ?
WHERE provider = ? AND device_token = ? AND ntf_host = ? AND ntf_port = ?
|]
(tknStatus, tknAction, updatedAt, provider, token, host, port)
(tknStatus, tknAction, updatedAt, provider, Binary token, host, port)
removeNtfToken :: DB.Connection -> NtfToken -> IO ()
removeNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer = ProtocolServer {host, port}} =
@@ -1505,7 +1504,7 @@ removeNtfToken db NtfToken {deviceToken = DeviceToken provider token, ntfServer
DELETE FROM ntf_tokens
WHERE provider = ? AND device_token = ? AND ntf_host = ? AND ntf_port = ?
|]
(provider, token, host, port)
(provider, Binary token, host, port)
addNtfTokenToDelete :: DB.Connection -> NtfServer -> C.APrivateAuthKey -> NtfTokenId -> IO ()
addNtfTokenToDelete db ProtocolServer {host, port, keyHash} ntfPrivKey tknId =
@@ -1819,7 +1818,7 @@ getActiveNtfToken db =
|]
(Only NTActive)
where
ntfToken ((host, port, keyHash) :. (provider, dt, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode_)) =
ntfToken ((host, port, keyHash) :. (provider, Binary dt, ntfTokenId, ntfPubKey, ntfPrivKey, ntfDhPubKey, ntfDhPrivKey, ntfDhSecret) :. (ntfTknStatus, ntfTknAction, ntfMode_)) =
let ntfServer = NtfServer host port keyHash
ntfDhKeys = (ntfDhPubKey, ntfDhPrivKey)
ntfMode = fromMaybe NMPeriodic ntfMode_

View File

@@ -6,6 +6,7 @@ module Simplex.Messaging.Agent.Store.Postgres.DB
PSQL.Connection,
FromField (..),
ToField (..),
SQLError,
PSQL.connect,
PSQL.close,
execute,
@@ -33,6 +34,8 @@ import Database.PostgreSQL.Simple.TypeInfo.Static (textOid, varcharOid)
newtype BoolInt = BI {unBI :: Bool}
type SQLError = PSQL.SqlError
instance FromField BoolInt where
fromField field dat = BI . (/= (0 :: Int)) <$> fromField field dat
{-# INLINE fromField #-}

View File

@@ -10,6 +10,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250322_short_links
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitations_remove_cascade_delete
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251230_strict_tables
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
schemaMigrations :: [(String, Text, Maybe Text)]
@@ -19,7 +20,8 @@ schemaMigrations =
("20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices)
("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
("20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables)
]
-- | The list of migrations in ascending order by date

View File

@@ -0,0 +1,66 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251230_strict_tables where
import Data.Text (Text)
import Text.RawString.QQ (r)
m20251230_strict_tables :: Text
m20251230_strict_tables =
isValidText
<> [r|
DELETE FROM ntf_tokens
WHERE NOT simplex_is_valid_text(ntf_mode);
ALTER TABLE ntf_tokens
ALTER COLUMN device_token TYPE BYTEA USING device_token::BYTEA,
ALTER COLUMN ntf_mode TYPE TEXT USING ntf_mode::TEXT;
UPDATE ntf_subscriptions
SET ntf_sub_action = NULL
WHERE NOT simplex_is_valid_text(ntf_sub_action);
UPDATE ntf_subscriptions
SET ntf_sub_smp_action = NULL
WHERE NOT simplex_is_valid_text(ntf_sub_smp_action);
ALTER TABLE ntf_subscriptions
ALTER COLUMN ntf_sub_action TYPE TEXT USING ntf_sub_action::TEXT,
ALTER COLUMN ntf_sub_smp_action TYPE TEXT USING ntf_sub_smp_action::TEXT;
DROP FUNCTION simplex_is_valid_text(BYTEA);
|]
down_m20251230_strict_tables :: Text
down_m20251230_strict_tables =
isValidText
<> [r|
DELETE FROM ntf_tokens
WHERE NOT simplex_is_valid_text(device_token);
ALTER TABLE ntf_tokens
ALTER COLUMN device_token TYPE TEXT USING device_token::TEXT,
ALTER COLUMN ntf_mode TYPE BYTEA USING ntf_mode::BYTEA;
ALTER TABLE ntf_subscriptions
ALTER COLUMN ntf_sub_action TYPE BYTEA USING ntf_sub_action::BYTEA,
ALTER COLUMN ntf_sub_smp_action TYPE BYTEA USING ntf_sub_smp_action::BYTEA;
DROP FUNCTION simplex_is_valid_text(BYTEA);
|]
isValidText :: Text
isValidText =
[r|
CREATE FUNCTION simplex_is_valid_text(b BYTEA)
RETURNS BOOLEAN
LANGUAGE plpgsql AS $$
BEGIN
PERFORM b::TEXT;
RETURN TRUE;
EXCEPTION
WHEN OTHERS THEN RETURN FALSE;
END;
$$;
|]

View File

@@ -14,6 +14,7 @@ module Simplex.Messaging.Agent.Store.SQLite.DB
TrackQueries (..),
FromField (..),
ToField (..),
SQLError,
open,
close,
execute,
@@ -38,7 +39,7 @@ import Data.Text (Text)
import qualified Data.Text as T
import Data.Time (diffUTCTime, getCurrentTime)
import Data.Typeable (Typeable)
import Database.SQLite.Simple (FromRow, ResultError (..), Query, SQLData (..), ToRow)
import Database.SQLite.Simple (FromRow, ResultError (..), Query, SQLData (..), SQLError, ToRow)
import qualified Database.SQLite.Simple as SQL
import Database.SQLite.Simple.FromField (FieldParser, FromField (..), returnError)
import Database.SQLite.Simple.Internal (Field (..))

View File

@@ -46,6 +46,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250322_short_links
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251230_strict_tables
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
schemaMigrations :: [(String, Query, Maybe Query)]
@@ -91,7 +92,8 @@ schemaMigrations =
("m20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("m20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
("m20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices)
("m20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
("m20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables)
]
-- | The list of migrations in ascending order by date

View File

@@ -19,7 +19,7 @@ CREATE TABLE ntf_servers (
CREATE TABLE ntf_tokens (
provider TEXT NOT NULL, -- apns
device_token TEXT NOT NULL, -- ! this field is mislabeled and is actually saved as binary
device_token TEXT NOT NULL,
ntf_host TEXT NOT NULL,
ntf_port TEXT NOT NULL,
tkn_id BLOB, -- token ID assigned by notifications server

View File

@@ -0,0 +1,58 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251230_strict_tables where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
m20251230_strict_tables :: Query
m20251230_strict_tables =
[sql|
UPDATE ntf_tokens SET ntf_mode = CAST(ntf_mode as TEXT);
UPDATE ntf_subscriptions
SET ntf_sub_action = CAST(ntf_sub_action as TEXT),
ntf_sub_smp_action = CAST(ntf_sub_smp_action as TEXT);
PRAGMA writable_schema=1;
UPDATE sqlite_master
SET sql = CASE
WHEN LOWER(SUBSTR(sql, -15)) = ') without rowid' THEN sql || ', STRICT'
WHEN SUBSTR(sql, -1) = ')' THEN sql || ' STRICT'
ELSE sql
END
WHERE type = 'table' AND name != 'sqlite_sequence';
UPDATE sqlite_master
SET sql = replace(sql, 'device_token TEXT NOT NULL', 'device_token BLOB NOT NULL')
WHERE type = 'table' AND name = 'ntf_tokens';
PRAGMA writable_schema=0;
|]
down_m20251230_strict_tables :: Query
down_m20251230_strict_tables =
[sql|
PRAGMA writable_schema=1;
UPDATE sqlite_master
SET sql = CASE
WHEN LOWER(SUBSTR(sql, -8)) = ', strict' THEN SUBSTR(sql, 1, LENGTH(sql) - 8)
WHEN LOWER(SUBSTR(sql, -7)) = ' strict' THEN SUBSTR(sql, 1, LENGTH(sql) - 7)
ELSE sql
END
WHERE type = 'table' AND name != 'sqlite_sequence';
UPDATE sqlite_master
SET sql = replace(sql, 'device_token BLOB NOT NULL', 'device_token TEXT NOT NULL')
WHERE type = 'table' AND name = 'ntf_tokens';
PRAGMA writable_schema=0;
UPDATE ntf_tokens SET ntf_mode = CAST(ntf_mode as BLOB);
UPDATE ntf_subscriptions
SET ntf_sub_action = CAST(ntf_sub_action as BLOB),
ntf_sub_smp_action = CAST(ntf_sub_smp_action as BLOB);
|]

View File

@@ -2,13 +2,13 @@ CREATE TABLE migrations(
name TEXT NOT NULL PRIMARY KEY,
ts TEXT NOT NULL,
down TEXT
);
) STRICT;
CREATE TABLE servers(
host TEXT NOT NULL,
port TEXT NOT NULL,
key_hash BLOB NOT NULL,
PRIMARY KEY(host, port)
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE connections(
conn_id BLOB NOT NULL PRIMARY KEY,
conn_mode TEXT NOT NULL,
@@ -28,7 +28,7 @@ CREATE TABLE connections(
ratchet_sync_state TEXT NOT NULL DEFAULT 'ok',
deleted_at_wait_delivery TEXT,
pq_support INTEGER NOT NULL DEFAULT 0
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE rcv_queues(
host TEXT NOT NULL,
port TEXT NOT NULL,
@@ -67,7 +67,7 @@ CREATE TABLE rcv_queues(
FOREIGN KEY(host, port) REFERENCES servers
ON DELETE RESTRICT ON UPDATE CASCADE,
UNIQUE(host, port, snd_id)
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE snd_queues(
host TEXT NOT NULL,
port TEXT NOT NULL,
@@ -89,7 +89,7 @@ CREATE TABLE snd_queues(
PRIMARY KEY(host, port, snd_id),
FOREIGN KEY(host, port) REFERENCES servers
ON DELETE RESTRICT ON UPDATE CASCADE
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE messages(
conn_id BLOB NOT NULL REFERENCES connections(conn_id)
ON DELETE CASCADE,
@@ -106,7 +106,7 @@ CREATE TABLE messages(
ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
FOREIGN KEY(conn_id, internal_snd_id) REFERENCES snd_messages
ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE rcv_messages(
conn_id BLOB NOT NULL,
internal_rcv_id INTEGER NOT NULL,
@@ -122,7 +122,7 @@ CREATE TABLE rcv_messages(
PRIMARY KEY(conn_id, internal_rcv_id),
FOREIGN KEY(conn_id, internal_id) REFERENCES messages
ON DELETE CASCADE
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE snd_messages(
conn_id BLOB NOT NULL,
internal_snd_id INTEGER NOT NULL,
@@ -139,7 +139,7 @@ CREATE TABLE snd_messages(
PRIMARY KEY(conn_id, internal_snd_id),
FOREIGN KEY(conn_id, internal_id) REFERENCES messages
ON DELETE CASCADE
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE conn_confirmations(
confirmation_id BLOB NOT NULL PRIMARY KEY,
conn_id BLOB NOT NULL REFERENCES connections ON DELETE CASCADE,
@@ -153,7 +153,7 @@ CREATE TABLE conn_confirmations(
,
smp_reply_queues BLOB NULL,
smp_client_version INTEGER
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE conn_invitations(
invitation_id BLOB NOT NULL PRIMARY KEY,
contact_conn_id BLOB REFERENCES connections ON DELETE SET NULL,
@@ -162,7 +162,7 @@ CREATE TABLE conn_invitations(
accepted INTEGER NOT NULL DEFAULT 0,
own_conn_info BLOB,
created_at TEXT NOT NULL DEFAULT(datetime('now'))
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE ratchets(
conn_id BLOB NOT NULL PRIMARY KEY REFERENCES connections
ON DELETE CASCADE,
@@ -177,7 +177,7 @@ CREATE TABLE ratchets(
x3dh_pub_key_2 BLOB,
pq_priv_kem BLOB,
pq_pub_kem BLOB
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE skipped_messages(
skipped_message_id INTEGER PRIMARY KEY,
conn_id BLOB NOT NULL REFERENCES ratchets
@@ -185,7 +185,7 @@ CREATE TABLE skipped_messages(
header_key BLOB NOT NULL,
msg_n INTEGER NOT NULL,
msg_key BLOB NOT NULL
);
) STRICT;
CREATE TABLE ntf_servers(
ntf_host TEXT NOT NULL,
ntf_port TEXT NOT NULL,
@@ -193,10 +193,10 @@ CREATE TABLE ntf_servers(
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now')),
PRIMARY KEY(ntf_host, ntf_port)
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE ntf_tokens(
provider TEXT NOT NULL, -- apns
device_token TEXT NOT NULL, -- ! this field is mislabeled and is actually saved as binary
device_token BLOB NOT NULL,
ntf_host TEXT NOT NULL,
ntf_port TEXT NOT NULL,
tkn_id BLOB, -- token ID assigned by notifications server
@@ -213,7 +213,7 @@ tkn_dh_secret BLOB, -- DH secret for e2e encryption of notifications
PRIMARY KEY(provider, device_token, ntf_host, ntf_port),
FOREIGN KEY(ntf_host, ntf_port) REFERENCES ntf_servers
ON DELETE RESTRICT ON UPDATE CASCADE
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE ntf_subscriptions(
conn_id BLOB NOT NULL,
smp_host TEXT NULL,
@@ -237,7 +237,7 @@ CREATE TABLE ntf_subscriptions(
ON DELETE SET NULL ON UPDATE CASCADE,
FOREIGN KEY(ntf_host, ntf_port) REFERENCES ntf_servers
ON DELETE RESTRICT ON UPDATE CASCADE
) WITHOUT ROWID;
) WITHOUT ROWID, STRICT;
CREATE TABLE commands(
command_id INTEGER PRIMARY KEY,
conn_id BLOB NOT NULL REFERENCES connections ON DELETE CASCADE,
@@ -252,7 +252,7 @@ CREATE TABLE commands(
failed INTEGER DEFAULT 0,
FOREIGN KEY(host, port) REFERENCES servers
ON DELETE RESTRICT ON UPDATE CASCADE
);
) STRICT;
CREATE TABLE snd_message_deliveries(
snd_message_delivery_id INTEGER PRIMARY KEY AUTOINCREMENT,
conn_id BLOB NOT NULL REFERENCES connections ON DELETE CASCADE,
@@ -260,13 +260,13 @@ CREATE TABLE snd_message_deliveries(
internal_id INTEGER NOT NULL,
failed INTEGER DEFAULT 0,
FOREIGN KEY(conn_id, internal_id) REFERENCES messages ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
);
) STRICT;
CREATE TABLE sqlite_sequence(name,seq);
CREATE TABLE users(
user_id INTEGER PRIMARY KEY AUTOINCREMENT
,
deleted INTEGER DEFAULT 0 CHECK(deleted NOT NULL)
);
) STRICT;
CREATE TABLE xftp_servers(
xftp_server_id INTEGER PRIMARY KEY,
xftp_host TEXT NOT NULL,
@@ -275,7 +275,7 @@ CREATE TABLE xftp_servers(
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now')),
UNIQUE(xftp_host, xftp_port, xftp_key_hash)
);
) STRICT;
CREATE TABLE rcv_files(
rcv_file_id INTEGER PRIMARY KEY,
rcv_file_entity_id BLOB NOT NULL,
@@ -302,7 +302,7 @@ CREATE TABLE rcv_files(
redirect_digest BLOB,
approved_relays INTEGER NOT NULL DEFAULT 0,
UNIQUE(rcv_file_entity_id)
);
) STRICT;
CREATE TABLE rcv_file_chunks(
rcv_file_chunk_id INTEGER PRIMARY KEY,
rcv_file_id INTEGER NOT NULL REFERENCES rcv_files ON DELETE CASCADE,
@@ -312,7 +312,7 @@ CREATE TABLE rcv_file_chunks(
tmp_path TEXT,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
) STRICT;
CREATE TABLE rcv_file_chunk_replicas(
rcv_file_chunk_replica_id INTEGER PRIMARY KEY,
rcv_file_chunk_id INTEGER NOT NULL REFERENCES rcv_file_chunks ON DELETE CASCADE,
@@ -325,7 +325,7 @@ CREATE TABLE rcv_file_chunk_replicas(
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
) STRICT;
CREATE TABLE snd_files(
snd_file_id INTEGER PRIMARY KEY,
snd_file_entity_id BLOB NOT NULL,
@@ -347,7 +347,7 @@ CREATE TABLE snd_files(
failed INTEGER DEFAULT 0,
redirect_size INTEGER,
redirect_digest BLOB
);
) STRICT;
CREATE TABLE snd_file_chunks(
snd_file_chunk_id INTEGER PRIMARY KEY,
snd_file_id INTEGER NOT NULL REFERENCES snd_files ON DELETE CASCADE,
@@ -357,7 +357,7 @@ CREATE TABLE snd_file_chunks(
digest BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
) STRICT;
CREATE TABLE snd_file_chunk_replicas(
snd_file_chunk_replica_id INTEGER PRIMARY KEY,
snd_file_chunk_id INTEGER NOT NULL REFERENCES snd_file_chunks ON DELETE CASCADE,
@@ -370,7 +370,7 @@ CREATE TABLE snd_file_chunk_replicas(
retries INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
) STRICT;
CREATE TABLE snd_file_chunk_replica_recipients(
snd_file_chunk_replica_recipient_id INTEGER PRIMARY KEY,
snd_file_chunk_replica_id INTEGER NOT NULL REFERENCES snd_file_chunk_replicas ON DELETE CASCADE,
@@ -378,7 +378,7 @@ CREATE TABLE snd_file_chunk_replica_recipients(
rcv_replica_key BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
) STRICT;
CREATE TABLE deleted_snd_chunk_replicas(
deleted_snd_chunk_replica_id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
@@ -392,28 +392,28 @@ CREATE TABLE deleted_snd_chunk_replicas(
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
,
failed INTEGER DEFAULT 0
);
) STRICT;
CREATE TABLE encrypted_rcv_message_hashes(
encrypted_rcv_message_hash_id INTEGER PRIMARY KEY,
conn_id BLOB NOT NULL REFERENCES connections ON DELETE CASCADE,
hash BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
) STRICT;
CREATE TABLE processed_ratchet_key_hashes(
processed_ratchet_key_hash_id INTEGER PRIMARY KEY,
conn_id BLOB NOT NULL REFERENCES connections ON DELETE CASCADE,
hash BLOB NOT NULL,
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
) STRICT;
CREATE TABLE servers_stats(
servers_stats_id INTEGER PRIMARY KEY,
servers_stats TEXT,
started_at TEXT NOT NULL DEFAULT(datetime('now')),
created_at TEXT NOT NULL DEFAULT(datetime('now')),
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
);
) STRICT;
CREATE TABLE ntf_tokens_to_delete(
ntf_token_to_delete_id INTEGER PRIMARY KEY,
ntf_host TEXT NOT NULL,
@@ -423,11 +423,11 @@ CREATE TABLE ntf_tokens_to_delete(
tkn_priv_key BLOB NOT NULL, -- client's private key to sign token commands,
del_failed INTEGER DEFAULT 0,
created_at TEXT NOT NULL DEFAULT(datetime('now'))
);
) STRICT;
CREATE TABLE snd_message_bodies(
snd_message_body_id INTEGER PRIMARY KEY,
agent_msg BLOB NOT NULL DEFAULT x''
);
) STRICT;
CREATE TABLE inv_short_links(
inv_short_link_id INTEGER PRIMARY KEY AUTOINCREMENT,
host TEXT NOT NULL,
@@ -438,7 +438,7 @@ CREATE TABLE inv_short_links(
snd_private_key BLOB NOT NULL,
snd_id BLOB,
FOREIGN KEY(host, port) REFERENCES servers ON DELETE RESTRICT ON UPDATE CASCADE
);
) STRICT;
CREATE TABLE client_notices(
client_notice_id INTEGER PRIMARY KEY AUTOINCREMENT,
protocol TEXT NOT NULL,
@@ -449,7 +449,7 @@ CREATE TABLE client_notices(
notice_ttl INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
) STRICT;
CREATE UNIQUE INDEX idx_rcv_queues_ntf ON rcv_queues(host, port, ntf_id);
CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(conn_id, rcv_queue_id);
CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(conn_id, snd_queue_id);

View File

@@ -13,8 +13,10 @@ import Simplex.Messaging.Agent.Protocol (ConnId, NotificationsMode (..), UserId)
import Simplex.Messaging.Agent.Store.DB (Binary (..), FromField (..), ToField (..), blobFieldDecoder, fromTextField_)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Protocol (NotifierId, NtfServer, SMPServer)
import Simplex.Messaging.Util (eitherToMaybe)
data NtfTknAction
= NTARegister
@@ -101,42 +103,40 @@ data NtfSubNTFAction
| NSARotate -- deprecated
deriving (Show)
instance Encoding NtfSubNTFAction where
smpEncode = \case
instance TextEncoding NtfSubNTFAction where
textEncode = \case
NSACreate -> "N"
NSACheck -> "C"
NSADelete -> "D"
NSARotate -> "R"
smpP =
A.anyChar >>= \case
'N' -> pure NSACreate
'C' -> pure NSACheck
'D' -> pure NSADelete
'R' -> pure NSARotate
_ -> fail "bad NtfSubNTFAction"
textDecode = \case
"N" -> Just NSACreate
"C" -> Just NSACheck
"D" -> Just NSADelete
"R" -> Just NSARotate
_ -> Nothing
instance FromField NtfSubNTFAction where fromField = blobFieldDecoder smpDecode
instance FromField NtfSubNTFAction where fromField = fromTextField_ textDecode
instance ToField NtfSubNTFAction where toField = toField . Binary . smpEncode
instance ToField NtfSubNTFAction where toField = toField . textEncode
data NtfSubSMPAction
= NSASmpKey
| NSASmpDelete
deriving (Show)
instance Encoding NtfSubSMPAction where
smpEncode = \case
instance TextEncoding NtfSubSMPAction where
textEncode = \case
NSASmpKey -> "K"
NSASmpDelete -> "D"
smpP =
A.anyChar >>= \case
'K' -> pure NSASmpKey
'D' -> pure NSASmpDelete
_ -> fail "bad NtfSubSMPAction"
textDecode = \case
"K" -> Just NSASmpKey
"D" -> Just NSASmpDelete
_ -> Nothing
instance FromField NtfSubSMPAction where fromField = blobFieldDecoder smpDecode
instance FromField NtfSubSMPAction where fromField = fromTextField_ textDecode
instance ToField NtfSubSMPAction where toField = toField . Binary . smpEncode
instance ToField NtfSubSMPAction where toField = toField . textEncode
data NtfAgentSubStatus
= -- | subscription started
@@ -171,7 +171,7 @@ instance Encoding NtfAgentSubStatus where
"DELETED" -> pure NASDeleted
_ -> fail "bad NtfAgentSubStatus"
instance FromField NtfAgentSubStatus where fromField = fromTextField_ $ either (const Nothing) Just . smpDecode . encodeUtf8
instance FromField NtfAgentSubStatus where fromField = fromTextField_ $ eitherToMaybe . smpDecode . encodeUtf8
instance ToField NtfAgentSubStatus where toField = toField . decodeLatin1 . smpEncode