diff --git a/simplexmq.cabal b/simplexmq.cabal index 94ddf0203..ed7920748 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -62,6 +62,7 @@ library Simplex.Messaging.Agent.Server Simplex.Messaging.Agent.Store Simplex.Messaging.Agent.Store.SQLite + Simplex.Messaging.Agent.Store.SQLite.DB Simplex.Messaging.Agent.Store.SQLite.Migrations Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220101_initial Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220301_snd_queue_keys diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 38d444b95..db090e5bf 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -122,7 +122,6 @@ import Data.Text (Text) import qualified Data.Text as T import Data.Time.Clock import Data.Time.Clock.System (systemToUTCTime) -import qualified Database.SQLite.Simple as DB import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteRcvFile, deleteSndFileInternal, deleteSndFileRemote, receiveFile, sendFile, startWorkers, toFSFilePath) import Simplex.FileTransfer.Description (ValidFileDescription) import Simplex.FileTransfer.Protocol (FileParty (..)) @@ -135,6 +134,7 @@ import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite +import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission) import qualified Simplex.Messaging.Crypto as C @@ -1720,7 +1720,7 @@ execAgentStoreSQL' :: AgentMonad m => AgentClient -> Text -> m [Text] execAgentStoreSQL' c sql = withStore' c (`execSQL` sql) getAgentMigrations' :: AgentMonad m => AgentClient -> m [UpMigration] -getAgentMigrations' c = map upMigration <$> withStore' c Migrations.getCurrent +getAgentMigrations' c = map upMigration <$> withStore' c (Migrations.getCurrent . DB.conn) debugAgentLocks' :: AgentMonad' m => AgentClient -> m AgentLocks debugAgentLocks' AgentClient {connLocks = cs, reconnectLocks = rs, deleteLock = d} = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index bc65b4a96..09de8166d 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -129,9 +129,7 @@ import Data.Set (Set) import qualified Data.Set as S import Data.Text.Encoding import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime) -import Data.Time.Clock (diffUTCTime) import Data.Word (Word16) -import qualified Database.SQLite.Simple as DB import GHC.Generics (Generic) import Network.Socket (HostName) import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError) @@ -147,6 +145,7 @@ import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction) +import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import Simplex.Messaging.Agent.TAsyncs import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues) import qualified Simplex.Messaging.Agent.TRcvQueues as RQ diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 586990e82..75d5761eb 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -203,6 +203,7 @@ module Simplex.Messaging.Agent.Store.SQLite -- * utilities withConnection, + withConnection', withTransaction, withTransactionCtx, firstRow, @@ -236,8 +237,8 @@ import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1, encodeUtf8) import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, diffUTCTime, getCurrentTime) import Data.Word (Word32) -import Database.SQLite.Simple (FromRow, NamedParam (..), Only (..), Query (..), SQLError, ToRow, field, (:.) (..)) -import qualified Database.SQLite.Simple as DB +import Database.SQLite.Simple (FromRow (..), NamedParam (..), Only (..), Query (..), SQLError, ToRow (..), field, (:.) (..)) +import qualified Database.SQLite.Simple as SQL import Database.SQLite.Simple.FromField import Database.SQLite.Simple.QQ (sql) import Database.SQLite.Simple.ToField (ToField (..)) @@ -251,6 +252,7 @@ import Simplex.FileTransfer.Types import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval (RI2State (..)) import Simplex.Messaging.Agent.Store +import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import Simplex.Messaging.Agent.Store.SQLite.Migrations (DownMigration (..), MTRError, Migration (..), MigrationsToRun (..), mtrErrorDescription) import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import qualified Simplex.Messaging.Crypto as C @@ -338,7 +340,7 @@ createSQLiteStore dbFilePath dbKey migrations confirmMigrations = do Left e -> closeSQLiteStore st $> Left e migrateSchema :: SQLiteStore -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError ()) -migrateSchema st migrations confirmMigrations = withConnection st $ \db -> do +migrateSchema st migrations confirmMigrations = withConnection' st $ \db -> do Migrations.initialize db Migrations.get db migrations >>= \case Left e -> do @@ -392,7 +394,7 @@ connectDB path key = do pure db where prepare db = do - let exec = SQLite3.exec $ DB.connectionHandle db + let exec = SQLite3.exec $ SQL.connectionHandle $ DB.conn db unless (null key) . exec $ "PRAGMA key = " <> sqlString key <> ";" exec . fromQuery $ [sql| @@ -425,7 +427,7 @@ sqlString s = quote <> T.replace quote "''" (T.pack s) <> quote execSQL :: DB.Connection -> Text -> IO [Text] execSQL db query = do rs <- newIORef [] - SQLite3.execWithCallback (DB.connectionHandle db) query (addSQLResultRow rs) + SQLite3.execWithCallback (SQL.connectionHandle $ DB.conn db) query (addSQLResultRow rs) reverse <$> readIORef rs addSQLResultRow :: IORef [Text] -> SQLite3.ColumnIndex -> [Text] -> [Maybe Text] -> IO () @@ -440,7 +442,7 @@ checkConstraint err action = action `E.catch` (pure . Left . handleSQLError err) handleSQLError :: StoreError -> SQLError -> StoreError handleSQLError err e - | DB.sqlError e == DB.ErrorConstraint = err + | SQL.sqlError e == SQL.ErrorConstraint = err | otherwise = SEInternal $ bshow e withConnection :: SQLiteStore -> (DB.Connection -> IO a) -> IO a @@ -449,6 +451,13 @@ withConnection SQLiteStore {dbConnection} = (atomically $ takeTMVar dbConnection) (atomically . putTMVar dbConnection) +withConnection' :: SQLiteStore -> (SQL.Connection -> IO a) -> IO a +withConnection' SQLiteStore {dbConnection} a = + bracket + (atomically $ takeTMVar dbConnection) + (atomically . putTMVar dbConnection) + (a . DB.conn) + withTransaction :: forall a. SQLiteStore -> (DB.Connection -> IO a) -> IO a withTransaction = withTransactionCtx Nothing @@ -456,19 +465,19 @@ withTransactionCtx :: forall a. Maybe String -> SQLiteStore -> (DB.Connection -> withTransactionCtx ctx_ st action = withConnection st $ loop 500 3_000_000 where loop :: Int -> Int -> DB.Connection -> IO a - loop t tLim db = + loop t tLim db@DB.Connection {conn} = transactionWithCtx `E.catch` \(e :: SQLError) -> - if tLim > t && DB.sqlError e == DB.ErrorBusy + if tLim > t && SQL.sqlError e == SQL.ErrorBusy then do threadDelay t loop (t * 9 `div` 8) (tLim - t) db else E.throwIO e where transactionWithCtx = case ctx_ of - Nothing -> DB.withImmediateTransaction db (action db) + Nothing -> SQL.withImmediateTransaction conn (action db) Just ctx -> do t1 <- getCurrentTime - r <- DB.withImmediateTransaction db (action db) + r <- SQL.withImmediateTransaction conn (action db) t2 <- getCurrentTime putStrLn $ "withTransactionCtx start :: " <> show t1 <> " :: " <> ctx putStrLn $ "withTransactionCtx end :: " <> show t2 <> " :: " <> ctx <> " :: duration=" <> show (diffToMilliseconds $ diffUTCTime t2 t1) @@ -2090,7 +2099,7 @@ createWithRandomId gVar create = tryCreate 3 E.try (create id') >>= \case Right _ -> pure $ Right id' Left e - | DB.sqlError e == DB.ErrorConstraint -> tryCreate (n - 1) + | SQL.sqlError e == SQL.ErrorConstraint -> tryCreate (n - 1) | otherwise -> pure . Left . SEInternal $ bshow e randomId :: TVar ChaChaDRG -> Int -> IO ByteString diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/DB.hs b/src/Simplex/Messaging/Agent/Store/SQLite/DB.hs new file mode 100644 index 000000000..cb892ffb3 --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/DB.hs @@ -0,0 +1,78 @@ +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE StrictData #-} + +module Simplex.Messaging.Agent.Store.SQLite.DB + ( Connection (..), + open, + close, + execute, + execute_, + executeNamed, + executeMany, + query, + query_, + queryNamed, + ) +where + +import Control.Concurrent.STM +import Control.Monad (when) +import Data.Int (Int64) +import Database.SQLite.Simple (FromRow, NamedParam, Query, ToRow) +import qualified Database.SQLite.Simple as SQL +import Data.Time (diffUTCTime, getCurrentTime) +import Simplex.Messaging.TMap (TMap) +import qualified Simplex.Messaging.TMap as TM +import Simplex.Messaging.Util (diffToMilliseconds) + +data Connection = Connection + { conn :: SQL.Connection, + slow :: TMap Query Int64 + } + +timeIt :: TMap Query Int64 -> Query -> IO a -> IO a +timeIt slow sql a = do + t <- getCurrentTime + r <- a + t' <- getCurrentTime + let diff = diffToMilliseconds $ diffUTCTime t' t + update = Just . maybe diff (max diff) + atomically $ when (diff > 50) $ TM.alter update sql slow + pure r + +open :: String -> IO Connection +open f = do + conn <- SQL.open f + slow <- atomically $ TM.empty + pure Connection {conn, slow} + +close :: Connection -> IO () +close = SQL.close . conn + +execute :: ToRow q => Connection -> Query -> q -> IO () +execute Connection {conn, slow} sql = timeIt slow sql . SQL.execute conn sql +{-# INLINE execute #-} + +execute_ :: Connection -> Query -> IO () +execute_ Connection {conn, slow} sql = timeIt slow sql $ SQL.execute_ conn sql +{-# INLINE execute_ #-} + +executeNamed :: Connection -> Query -> [NamedParam] -> IO () +executeNamed Connection {conn, slow} sql = timeIt slow sql . SQL.executeNamed conn sql +{-# INLINE executeNamed #-} + +executeMany :: ToRow q => Connection -> Query -> [q] -> IO () +executeMany Connection {conn, slow} sql = timeIt slow sql . SQL.executeMany conn sql +{-# INLINE executeMany #-} + +query :: (ToRow q, FromRow r) => Connection -> Query -> q -> IO [r] +query Connection {conn, slow} sql = timeIt slow sql . SQL.query conn sql +{-# INLINE query #-} + +query_ :: FromRow r => Connection -> Query -> IO [r] +query_ Connection {conn, slow} sql = timeIt slow sql $ SQL.query_ conn sql +{-# INLINE query_ #-} + +queryNamed :: FromRow r => Connection -> Query -> [NamedParam] -> IO [r] +queryNamed Connection {conn, slow} sql = timeIt slow sql . SQL.queryNamed conn sql +{-# INLINE queryNamed #-} diff --git a/tests/AgentTests/MigrationTests.hs b/tests/AgentTests/MigrationTests.hs index 54c4cd554..e3a147644 100644 --- a/tests/AgentTests/MigrationTests.hs +++ b/tests/AgentTests/MigrationTests.hs @@ -6,8 +6,8 @@ import Control.Monad import Data.Maybe (fromJust) import Data.Word (Word32) import Database.SQLite.Simple (fromOnly) -import qualified Database.SQLite.Simple as DB import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), MigrationError (MEDowngrade, MEUpgrade, MigrationError), SQLiteStore, closeSQLiteStore, createSQLiteStore, upMigration, withTransaction) +import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import Simplex.Messaging.Agent.Store.SQLite.Migrations import System.Directory (removeFile) import System.Random (randomIO) diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index 3aa0721d4..7943ca425 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -18,13 +18,14 @@ import qualified Data.Text as T import Data.Text.Encoding (encodeUtf8) import Data.Time import Data.Word (Word32) -import qualified Database.SQLite.Simple as DB +import qualified Database.SQLite.Simple as SQL import Database.SQLite.Simple.QQ (sql) import SMPClient (testKeyHash) import Simplex.Messaging.Agent.Client () import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite +import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import qualified Simplex.Messaging.Crypto as C import qualified Simplex.Messaging.Protocol as SMP @@ -137,7 +138,7 @@ testForeignKeysEnabled = ('smp.simplex.im', '5223', '1234', '2345', x'', x'', 'new'); |] DB.execute_ db inconsistentQuery - `shouldThrow` (\e -> DB.sqlError e == DB.ErrorConstraint) + `shouldThrow` (\e -> SQL.sqlError e == SQL.ErrorConstraint) cData1 :: ConnData cData1 = ConnData {userId = 1, connId = "conn1", connAgentVersion = 1, enableNtfs = True, duplexHandshake = Nothing, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk} diff --git a/tests/AgentTests/SchemaDump.hs b/tests/AgentTests/SchemaDump.hs index 8154e3c01..769ae9e5b 100644 --- a/tests/AgentTests/SchemaDump.hs +++ b/tests/AgentTests/SchemaDump.hs @@ -50,13 +50,13 @@ testSchemaMigrations = do putStrLn $ "down migration " <> name m let downMigr = fromJust $ toDownMigration m schema <- getSchema testDB testSchema - withConnection st (`Migrations.run` MTRUp [m]) + withConnection' st (`Migrations.run` MTRUp [m]) schema' <- getSchema testDB testSchema schema' `shouldNotBe` schema - withConnection st (`Migrations.run` MTRDown [downMigr]) + withConnection' st (`Migrations.run` MTRDown [downMigr]) schema'' <- getSchema testDB testSchema schema'' `shouldBe` schema - withConnection st (`Migrations.run` MTRUp [m]) + withConnection' st (`Migrations.run` MTRUp [m]) schema''' <- getSchema testDB testSchema schema''' `shouldBe` schema'