mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-26 21:45:15 +00:00
agent: track slow SQL queries (#822)
* agent: track slow SQL queries * add executeMany * reduce threshold for slow queries to 50ms
This commit is contained in:
committed by
GitHub
parent
43f283471c
commit
e2065ab352
@@ -62,6 +62,7 @@ library
|
||||
Simplex.Messaging.Agent.Server
|
||||
Simplex.Messaging.Agent.Store
|
||||
Simplex.Messaging.Agent.Store.SQLite
|
||||
Simplex.Messaging.Agent.Store.SQLite.DB
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220101_initial
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20220301_snd_queue_keys
|
||||
|
||||
@@ -122,7 +122,6 @@ import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock
|
||||
import Data.Time.Clock.System (systemToUTCTime)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import Simplex.FileTransfer.Agent (closeXFTPAgent, deleteRcvFile, deleteSndFileInternal, deleteSndFileRemote, receiveFile, sendFile, startWorkers, toFSFilePath)
|
||||
import Simplex.FileTransfer.Description (ValidFileDescription)
|
||||
import Simplex.FileTransfer.Protocol (FileParty (..))
|
||||
@@ -135,6 +134,7 @@ import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
|
||||
import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -1720,7 +1720,7 @@ execAgentStoreSQL' :: AgentMonad m => AgentClient -> Text -> m [Text]
|
||||
execAgentStoreSQL' c sql = withStore' c (`execSQL` sql)
|
||||
|
||||
getAgentMigrations' :: AgentMonad m => AgentClient -> m [UpMigration]
|
||||
getAgentMigrations' c = map upMigration <$> withStore' c Migrations.getCurrent
|
||||
getAgentMigrations' c = map upMigration <$> withStore' c (Migrations.getCurrent . DB.conn)
|
||||
|
||||
debugAgentLocks' :: AgentMonad' m => AgentClient -> m AgentLocks
|
||||
debugAgentLocks' AgentClient {connLocks = cs, reconnectLocks = rs, deleteLock = d} = do
|
||||
|
||||
@@ -129,9 +129,7 @@ import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Data.Text.Encoding
|
||||
import Data.Time (UTCTime, defaultTimeLocale, formatTime, getCurrentTime)
|
||||
import Data.Time.Clock (diffUTCTime)
|
||||
import Data.Word (Word16)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import GHC.Generics (Generic)
|
||||
import Network.Socket (HostName)
|
||||
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
|
||||
@@ -147,6 +145,7 @@ import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite (SQLiteStore (..), withTransaction)
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
|
||||
import Simplex.Messaging.Agent.TAsyncs
|
||||
import Simplex.Messaging.Agent.TRcvQueues (TRcvQueues)
|
||||
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
|
||||
|
||||
@@ -203,6 +203,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
|
||||
-- * utilities
|
||||
withConnection,
|
||||
withConnection',
|
||||
withTransaction,
|
||||
withTransactionCtx,
|
||||
firstRow,
|
||||
@@ -236,8 +237,8 @@ import qualified Data.Text as T
|
||||
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
|
||||
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, diffUTCTime, getCurrentTime)
|
||||
import Data.Word (Word32)
|
||||
import Database.SQLite.Simple (FromRow, NamedParam (..), Only (..), Query (..), SQLError, ToRow, field, (:.) (..))
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import Database.SQLite.Simple (FromRow (..), NamedParam (..), Only (..), Query (..), SQLError, ToRow (..), field, (:.) (..))
|
||||
import qualified Database.SQLite.Simple as SQL
|
||||
import Database.SQLite.Simple.FromField
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
import Database.SQLite.Simple.ToField (ToField (..))
|
||||
@@ -251,6 +252,7 @@ import Simplex.FileTransfer.Types
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval (RI2State (..))
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations (DownMigration (..), MTRError, Migration (..), MigrationsToRun (..), mtrErrorDescription)
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -338,7 +340,7 @@ createSQLiteStore dbFilePath dbKey migrations confirmMigrations = do
|
||||
Left e -> closeSQLiteStore st $> Left e
|
||||
|
||||
migrateSchema :: SQLiteStore -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError ())
|
||||
migrateSchema st migrations confirmMigrations = withConnection st $ \db -> do
|
||||
migrateSchema st migrations confirmMigrations = withConnection' st $ \db -> do
|
||||
Migrations.initialize db
|
||||
Migrations.get db migrations >>= \case
|
||||
Left e -> do
|
||||
@@ -392,7 +394,7 @@ connectDB path key = do
|
||||
pure db
|
||||
where
|
||||
prepare db = do
|
||||
let exec = SQLite3.exec $ DB.connectionHandle db
|
||||
let exec = SQLite3.exec $ SQL.connectionHandle $ DB.conn db
|
||||
unless (null key) . exec $ "PRAGMA key = " <> sqlString key <> ";"
|
||||
exec . fromQuery $
|
||||
[sql|
|
||||
@@ -425,7 +427,7 @@ sqlString s = quote <> T.replace quote "''" (T.pack s) <> quote
|
||||
execSQL :: DB.Connection -> Text -> IO [Text]
|
||||
execSQL db query = do
|
||||
rs <- newIORef []
|
||||
SQLite3.execWithCallback (DB.connectionHandle db) query (addSQLResultRow rs)
|
||||
SQLite3.execWithCallback (SQL.connectionHandle $ DB.conn db) query (addSQLResultRow rs)
|
||||
reverse <$> readIORef rs
|
||||
|
||||
addSQLResultRow :: IORef [Text] -> SQLite3.ColumnIndex -> [Text] -> [Maybe Text] -> IO ()
|
||||
@@ -440,7 +442,7 @@ checkConstraint err action = action `E.catch` (pure . Left . handleSQLError err)
|
||||
|
||||
handleSQLError :: StoreError -> SQLError -> StoreError
|
||||
handleSQLError err e
|
||||
| DB.sqlError e == DB.ErrorConstraint = err
|
||||
| SQL.sqlError e == SQL.ErrorConstraint = err
|
||||
| otherwise = SEInternal $ bshow e
|
||||
|
||||
withConnection :: SQLiteStore -> (DB.Connection -> IO a) -> IO a
|
||||
@@ -449,6 +451,13 @@ withConnection SQLiteStore {dbConnection} =
|
||||
(atomically $ takeTMVar dbConnection)
|
||||
(atomically . putTMVar dbConnection)
|
||||
|
||||
withConnection' :: SQLiteStore -> (SQL.Connection -> IO a) -> IO a
|
||||
withConnection' SQLiteStore {dbConnection} a =
|
||||
bracket
|
||||
(atomically $ takeTMVar dbConnection)
|
||||
(atomically . putTMVar dbConnection)
|
||||
(a . DB.conn)
|
||||
|
||||
withTransaction :: forall a. SQLiteStore -> (DB.Connection -> IO a) -> IO a
|
||||
withTransaction = withTransactionCtx Nothing
|
||||
|
||||
@@ -456,19 +465,19 @@ withTransactionCtx :: forall a. Maybe String -> SQLiteStore -> (DB.Connection ->
|
||||
withTransactionCtx ctx_ st action = withConnection st $ loop 500 3_000_000
|
||||
where
|
||||
loop :: Int -> Int -> DB.Connection -> IO a
|
||||
loop t tLim db =
|
||||
loop t tLim db@DB.Connection {conn} =
|
||||
transactionWithCtx `E.catch` \(e :: SQLError) ->
|
||||
if tLim > t && DB.sqlError e == DB.ErrorBusy
|
||||
if tLim > t && SQL.sqlError e == SQL.ErrorBusy
|
||||
then do
|
||||
threadDelay t
|
||||
loop (t * 9 `div` 8) (tLim - t) db
|
||||
else E.throwIO e
|
||||
where
|
||||
transactionWithCtx = case ctx_ of
|
||||
Nothing -> DB.withImmediateTransaction db (action db)
|
||||
Nothing -> SQL.withImmediateTransaction conn (action db)
|
||||
Just ctx -> do
|
||||
t1 <- getCurrentTime
|
||||
r <- DB.withImmediateTransaction db (action db)
|
||||
r <- SQL.withImmediateTransaction conn (action db)
|
||||
t2 <- getCurrentTime
|
||||
putStrLn $ "withTransactionCtx start :: " <> show t1 <> " :: " <> ctx
|
||||
putStrLn $ "withTransactionCtx end :: " <> show t2 <> " :: " <> ctx <> " :: duration=" <> show (diffToMilliseconds $ diffUTCTime t2 t1)
|
||||
@@ -2090,7 +2099,7 @@ createWithRandomId gVar create = tryCreate 3
|
||||
E.try (create id') >>= \case
|
||||
Right _ -> pure $ Right id'
|
||||
Left e
|
||||
| DB.sqlError e == DB.ErrorConstraint -> tryCreate (n - 1)
|
||||
| SQL.sqlError e == SQL.ErrorConstraint -> tryCreate (n - 1)
|
||||
| otherwise -> pure . Left . SEInternal $ bshow e
|
||||
|
||||
randomId :: TVar ChaChaDRG -> Int -> IO ByteString
|
||||
|
||||
@@ -0,0 +1,78 @@
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE StrictData #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.SQLite.DB
|
||||
( Connection (..),
|
||||
open,
|
||||
close,
|
||||
execute,
|
||||
execute_,
|
||||
executeNamed,
|
||||
executeMany,
|
||||
query,
|
||||
query_,
|
||||
queryNamed,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Control.Monad (when)
|
||||
import Data.Int (Int64)
|
||||
import Database.SQLite.Simple (FromRow, NamedParam, Query, ToRow)
|
||||
import qualified Database.SQLite.Simple as SQL
|
||||
import Data.Time (diffUTCTime, getCurrentTime)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (diffToMilliseconds)
|
||||
|
||||
data Connection = Connection
|
||||
{ conn :: SQL.Connection,
|
||||
slow :: TMap Query Int64
|
||||
}
|
||||
|
||||
timeIt :: TMap Query Int64 -> Query -> IO a -> IO a
|
||||
timeIt slow sql a = do
|
||||
t <- getCurrentTime
|
||||
r <- a
|
||||
t' <- getCurrentTime
|
||||
let diff = diffToMilliseconds $ diffUTCTime t' t
|
||||
update = Just . maybe diff (max diff)
|
||||
atomically $ when (diff > 50) $ TM.alter update sql slow
|
||||
pure r
|
||||
|
||||
open :: String -> IO Connection
|
||||
open f = do
|
||||
conn <- SQL.open f
|
||||
slow <- atomically $ TM.empty
|
||||
pure Connection {conn, slow}
|
||||
|
||||
close :: Connection -> IO ()
|
||||
close = SQL.close . conn
|
||||
|
||||
execute :: ToRow q => Connection -> Query -> q -> IO ()
|
||||
execute Connection {conn, slow} sql = timeIt slow sql . SQL.execute conn sql
|
||||
{-# INLINE execute #-}
|
||||
|
||||
execute_ :: Connection -> Query -> IO ()
|
||||
execute_ Connection {conn, slow} sql = timeIt slow sql $ SQL.execute_ conn sql
|
||||
{-# INLINE execute_ #-}
|
||||
|
||||
executeNamed :: Connection -> Query -> [NamedParam] -> IO ()
|
||||
executeNamed Connection {conn, slow} sql = timeIt slow sql . SQL.executeNamed conn sql
|
||||
{-# INLINE executeNamed #-}
|
||||
|
||||
executeMany :: ToRow q => Connection -> Query -> [q] -> IO ()
|
||||
executeMany Connection {conn, slow} sql = timeIt slow sql . SQL.executeMany conn sql
|
||||
{-# INLINE executeMany #-}
|
||||
|
||||
query :: (ToRow q, FromRow r) => Connection -> Query -> q -> IO [r]
|
||||
query Connection {conn, slow} sql = timeIt slow sql . SQL.query conn sql
|
||||
{-# INLINE query #-}
|
||||
|
||||
query_ :: FromRow r => Connection -> Query -> IO [r]
|
||||
query_ Connection {conn, slow} sql = timeIt slow sql $ SQL.query_ conn sql
|
||||
{-# INLINE query_ #-}
|
||||
|
||||
queryNamed :: FromRow r => Connection -> Query -> [NamedParam] -> IO [r]
|
||||
queryNamed Connection {conn, slow} sql = timeIt slow sql . SQL.queryNamed conn sql
|
||||
{-# INLINE queryNamed #-}
|
||||
@@ -6,8 +6,8 @@ import Control.Monad
|
||||
import Data.Maybe (fromJust)
|
||||
import Data.Word (Word32)
|
||||
import Database.SQLite.Simple (fromOnly)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import Simplex.Messaging.Agent.Store.SQLite (MigrationConfirmation (..), MigrationError (MEDowngrade, MEUpgrade, MigrationError), SQLiteStore, closeSQLiteStore, createSQLiteStore, upMigration, withTransaction)
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations
|
||||
import System.Directory (removeFile)
|
||||
import System.Random (randomIO)
|
||||
|
||||
@@ -18,13 +18,14 @@ import qualified Data.Text as T
|
||||
import Data.Text.Encoding (encodeUtf8)
|
||||
import Data.Time
|
||||
import Data.Word (Word32)
|
||||
import qualified Database.SQLite.Simple as DB
|
||||
import qualified Database.SQLite.Simple as SQL
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
import SMPClient (testKeyHash)
|
||||
import Simplex.Messaging.Agent.Client ()
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.Store
|
||||
import Simplex.Messaging.Agent.Store.SQLite
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
@@ -137,7 +138,7 @@ testForeignKeysEnabled =
|
||||
('smp.simplex.im', '5223', '1234', '2345', x'', x'', 'new');
|
||||
|]
|
||||
DB.execute_ db inconsistentQuery
|
||||
`shouldThrow` (\e -> DB.sqlError e == DB.ErrorConstraint)
|
||||
`shouldThrow` (\e -> SQL.sqlError e == SQL.ErrorConstraint)
|
||||
|
||||
cData1 :: ConnData
|
||||
cData1 = ConnData {userId = 1, connId = "conn1", connAgentVersion = 1, enableNtfs = True, duplexHandshake = Nothing, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk}
|
||||
|
||||
@@ -50,13 +50,13 @@ testSchemaMigrations = do
|
||||
putStrLn $ "down migration " <> name m
|
||||
let downMigr = fromJust $ toDownMigration m
|
||||
schema <- getSchema testDB testSchema
|
||||
withConnection st (`Migrations.run` MTRUp [m])
|
||||
withConnection' st (`Migrations.run` MTRUp [m])
|
||||
schema' <- getSchema testDB testSchema
|
||||
schema' `shouldNotBe` schema
|
||||
withConnection st (`Migrations.run` MTRDown [downMigr])
|
||||
withConnection' st (`Migrations.run` MTRDown [downMigr])
|
||||
schema'' <- getSchema testDB testSchema
|
||||
schema'' `shouldBe` schema
|
||||
withConnection st (`Migrations.run` MTRUp [m])
|
||||
withConnection' st (`Migrations.run` MTRUp [m])
|
||||
schema''' <- getSchema testDB testSchema
|
||||
schema''' `shouldBe` schema'
|
||||
|
||||
|
||||
Reference in New Issue
Block a user