mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
agent: track queries (#1439)
This commit is contained in:
@@ -69,32 +69,33 @@ data DBOpts = DBOpts
|
||||
{ dbFilePath :: FilePath,
|
||||
dbKey :: ScrubbedBytes,
|
||||
keepKey :: Bool,
|
||||
vacuum :: Bool
|
||||
vacuum :: Bool,
|
||||
track :: DB.TrackQueries
|
||||
}
|
||||
|
||||
createDBStore :: DBOpts -> [Migration] -> MigrationConfirmation -> IO (Either MigrationError DBStore)
|
||||
createDBStore DBOpts {dbFilePath, dbKey, keepKey, vacuum} migrations confirmMigrations = do
|
||||
createDBStore DBOpts {dbFilePath, dbKey, keepKey, track, vacuum} migrations confirmMigrations = do
|
||||
let dbDir = takeDirectory dbFilePath
|
||||
createDirectoryIfMissing True dbDir
|
||||
st <- connectSQLiteStore dbFilePath dbKey keepKey
|
||||
st <- connectSQLiteStore dbFilePath dbKey keepKey track
|
||||
r <- migrateSchema st migrations confirmMigrations vacuum `onException` closeDBStore st
|
||||
case r of
|
||||
Right () -> pure $ Right st
|
||||
Left e -> closeDBStore st $> Left e
|
||||
|
||||
connectSQLiteStore :: FilePath -> ScrubbedBytes -> Bool -> IO DBStore
|
||||
connectSQLiteStore dbFilePath key keepKey = do
|
||||
connectSQLiteStore :: FilePath -> ScrubbedBytes -> Bool -> DB.TrackQueries -> IO DBStore
|
||||
connectSQLiteStore dbFilePath key keepKey track = do
|
||||
dbNew <- not <$> doesFileExist dbFilePath
|
||||
dbConn <- dbBusyLoop (connectDB dbFilePath key)
|
||||
dbConn <- dbBusyLoop (connectDB dbFilePath key track)
|
||||
dbConnection <- newMVar dbConn
|
||||
dbKey <- newTVarIO $! storeKey key keepKey
|
||||
dbClosed <- newTVarIO False
|
||||
dbSem <- newTVarIO 0
|
||||
pure DBStore {dbFilePath, dbKey, dbSem, dbConnection, dbNew, dbClosed}
|
||||
|
||||
connectDB :: FilePath -> ScrubbedBytes -> IO DB.Connection
|
||||
connectDB path key = do
|
||||
db <- DB.open path
|
||||
connectDB :: FilePath -> ScrubbedBytes -> DB.TrackQueries -> IO DB.Connection
|
||||
connectDB path key track = do
|
||||
db <- DB.open path track
|
||||
prepare db `onException` DB.close db
|
||||
-- _printPragmas db path
|
||||
pure db
|
||||
@@ -127,12 +128,12 @@ openSQLiteStore_ DBStore {dbConnection, dbFilePath, dbKey, dbClosed} key keepKey
|
||||
bracketOnError
|
||||
(takeMVar dbConnection)
|
||||
(tryPutMVar dbConnection)
|
||||
$ \DB.Connection {slow} -> do
|
||||
DB.Connection {conn} <- connectDB dbFilePath key
|
||||
$ \DB.Connection {slow, track} -> do
|
||||
DB.Connection {conn} <- connectDB dbFilePath key track
|
||||
atomically $ do
|
||||
writeTVar dbClosed False
|
||||
writeTVar dbKey $! storeKey key keepKey
|
||||
putMVar dbConnection DB.Connection {conn, slow}
|
||||
putMVar dbConnection DB.Connection {conn, slow, track}
|
||||
|
||||
reopenDBStore :: DBStore -> IO ()
|
||||
reopenDBStore st@DBStore {dbKey, dbClosed} =
|
||||
|
||||
@@ -11,6 +11,7 @@ module Simplex.Messaging.Agent.Store.SQLite.DB
|
||||
Binary (..),
|
||||
Connection (..),
|
||||
SlowQueryStats (..),
|
||||
TrackQueries (..),
|
||||
open,
|
||||
close,
|
||||
execute,
|
||||
@@ -38,7 +39,7 @@ import Database.SQLite.Simple.ToField (ToField (..))
|
||||
import Simplex.Messaging.Parsers (defaultJSON)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (diffToMilliseconds, tshow)
|
||||
import Simplex.Messaging.Util (diffToMicroseconds, tshow)
|
||||
|
||||
newtype BoolInt = BI {unBI :: Bool}
|
||||
deriving newtype (FromField, ToField)
|
||||
@@ -48,9 +49,13 @@ newtype Binary = Binary {fromBinary :: ByteString}
|
||||
|
||||
data Connection = Connection
|
||||
{ conn :: SQL.Connection,
|
||||
track :: TrackQueries,
|
||||
slow :: TMap Query SlowQueryStats
|
||||
}
|
||||
|
||||
data TrackQueries = TQAll | TQSlow Int64 | TQOff
|
||||
deriving (Eq)
|
||||
|
||||
data SlowQueryStats = SlowQueryStats
|
||||
{ count :: Int64,
|
||||
timeMax :: Int64,
|
||||
@@ -59,22 +64,29 @@ data SlowQueryStats = SlowQueryStats
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
timeIt :: TMap Query SlowQueryStats -> Query -> IO a -> IO a
|
||||
timeIt slow sql a = do
|
||||
t <- getCurrentTime
|
||||
r <-
|
||||
a `catch` \e -> do
|
||||
atomically $ TM.alter (Just . updateQueryErrors e) sql slow
|
||||
throwIO e
|
||||
t' <- getCurrentTime
|
||||
let diff = diffToMilliseconds $ diffUTCTime t' t
|
||||
when (diff > 1) $ atomically $ TM.alter (updateQueryStats diff) sql slow
|
||||
pure r
|
||||
timeIt :: Connection -> Query -> IO a -> IO a
|
||||
timeIt Connection {slow, track} sql a
|
||||
| track == TQOff = makeQuery
|
||||
| otherwise = do
|
||||
t <- getCurrentTime
|
||||
r <- makeQuery
|
||||
t' <- getCurrentTime
|
||||
let diff = diffToMicroseconds $ diffUTCTime t' t
|
||||
when (trackQuery diff) $ atomically $ TM.alter (updateQueryStats diff) sql slow
|
||||
pure r
|
||||
where
|
||||
makeQuery =
|
||||
a `catch` \e -> do
|
||||
atomically $ TM.alter (Just . updateQueryErrors e) sql slow
|
||||
throwIO e
|
||||
trackQuery diff = case track of
|
||||
TQOff -> False
|
||||
TQSlow t -> diff > t
|
||||
TQAll -> True
|
||||
updateQueryErrors :: SomeException -> Maybe SlowQueryStats -> SlowQueryStats
|
||||
updateQueryErrors e Nothing = SlowQueryStats 0 0 0 $ M.singleton (tshow e) 1
|
||||
updateQueryErrors e (Just stats@SlowQueryStats {errs}) =
|
||||
stats {errs = M.alter (Just . maybe 1 (+ 1)) (tshow e) errs}
|
||||
updateQueryErrors e (Just st@SlowQueryStats {errs}) =
|
||||
st {errs = M.alter (Just . maybe 1 (+ 1)) (tshow e) errs}
|
||||
updateQueryStats :: Int64 -> Maybe SlowQueryStats -> Maybe SlowQueryStats
|
||||
updateQueryStats diff Nothing = Just $ SlowQueryStats 1 diff diff M.empty
|
||||
updateQueryStats diff (Just SlowQueryStats {count, timeMax, timeAvg, errs}) =
|
||||
@@ -86,33 +98,33 @@ timeIt slow sql a = do
|
||||
errs
|
||||
}
|
||||
|
||||
open :: String -> IO Connection
|
||||
open f = do
|
||||
open :: String -> TrackQueries -> IO Connection
|
||||
open f track = do
|
||||
conn <- SQL.open f
|
||||
slow <- TM.emptyIO
|
||||
pure Connection {conn, slow}
|
||||
pure Connection {conn, slow, track}
|
||||
|
||||
close :: Connection -> IO ()
|
||||
close = SQL.close . conn
|
||||
|
||||
execute :: ToRow q => Connection -> Query -> q -> IO ()
|
||||
execute Connection {conn, slow} sql = timeIt slow sql . SQL.execute conn sql
|
||||
execute c sql = timeIt c sql . SQL.execute (conn c) sql
|
||||
{-# INLINE execute #-}
|
||||
|
||||
execute_ :: Connection -> Query -> IO ()
|
||||
execute_ Connection {conn, slow} sql = timeIt slow sql $ SQL.execute_ conn sql
|
||||
execute_ c sql = timeIt c sql $ SQL.execute_ (conn c) sql
|
||||
{-# INLINE execute_ #-}
|
||||
|
||||
executeMany :: ToRow q => Connection -> Query -> [q] -> IO ()
|
||||
executeMany Connection {conn, slow} sql = timeIt slow sql . SQL.executeMany conn sql
|
||||
executeMany c sql = timeIt c sql . SQL.executeMany (conn c) sql
|
||||
{-# INLINE executeMany #-}
|
||||
|
||||
query :: (ToRow q, FromRow r) => Connection -> Query -> q -> IO [r]
|
||||
query Connection {conn, slow} sql = timeIt slow sql . SQL.query conn sql
|
||||
query c sql = timeIt c sql . SQL.query (conn c) sql
|
||||
{-# INLINE query #-}
|
||||
|
||||
query_ :: FromRow r => Connection -> Query -> IO [r]
|
||||
query_ Connection {conn, slow} sql = timeIt slow sql $ SQL.query_ conn sql
|
||||
query_ c sql = timeIt c sql $ SQL.query_ (conn c) sql
|
||||
{-# INLINE query_ #-}
|
||||
|
||||
$(J.deriveJSON defaultJSON ''SlowQueryStats)
|
||||
|
||||
Reference in New Issue
Block a user