From 83721240a4f326c5859d8509adbfb513c6062186 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Sat, 13 Sep 2025 09:30:01 +0100 Subject: [PATCH] reset db connection on errors (#1633) --- src/Simplex/Messaging/Agent/Store/Postgres.hs | 3 +- .../Messaging/Agent/Store/Postgres/Common.hs | 31 +++++++++++-------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Store/Postgres.hs b/src/Simplex/Messaging/Agent/Store/Postgres.hs index 075e4be48..9752c547f 100644 --- a/src/Simplex/Messaging/Agent/Store/Postgres.hs +++ b/src/Simplex/Messaging/Agent/Store/Postgres.hs @@ -56,7 +56,8 @@ connectPostgresStore DBOpts {connstr, schema, poolSize, createSchema} = do dbPriorityPool <- newDBStorePool poolSize dbPool <- newDBStorePool poolSize dbClosed <- newTVarIO True - let st = DBStore {dbConnstr = connstr, dbSchema = schema, dbPoolSize = fromIntegral poolSize, dbPriorityPool, dbPool, dbNew = False, dbClosed} + let dbConnect = fst <$> connectDB connstr schema False + st = DBStore {dbConnstr = connstr, dbSchema = schema, dbPoolSize = fromIntegral poolSize, dbPriorityPool, dbPool, dbConnect, dbNew = False, dbClosed} dbNew <- connectStore st createSchema pure st {dbNew} diff --git a/src/Simplex/Messaging/Agent/Store/Postgres/Common.hs b/src/Simplex/Messaging/Agent/Store/Postgres/Common.hs index 3ca0a755e..fac2c1c10 100644 --- a/src/Simplex/Messaging/Agent/Store/Postgres/Common.hs +++ b/src/Simplex/Messaging/Agent/Store/Postgres/Common.hs @@ -2,6 +2,7 @@ {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE QuasiQuotes #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} module Simplex.Messaging.Agent.Store.Postgres.Common @@ -19,7 +20,7 @@ where import Control.Concurrent.MVar import Control.Concurrent.STM -import Control.Exception (bracket) +import qualified Control.Exception as E import Data.ByteString (ByteString) import qualified Database.PostgreSQL.Simple as PSQL import Numeric.Natural (Natural) @@ -32,11 +33,7 @@ data DBStore = DBStore dbPoolSize :: Int, dbPriorityPool :: DBStorePool, dbPool :: DBStorePool, - -- dbPoolSize :: Int, - -- dbPool :: TBQueue PSQL.Connection, - -- -- MVar is needed for fair pool distribution, without STM retry contention. - -- -- Only one thread can be blocked on STM read. - -- dbSem :: MVar (), + dbConnect :: IO PSQL.Connection, dbClosed :: TVar Bool, dbNew :: Bool } @@ -55,15 +52,23 @@ data DBStorePool = DBStorePool } withConnectionPriority :: DBStore -> Bool -> (PSQL.Connection -> IO a) -> IO a -withConnectionPriority DBStore {dbPriorityPool, dbPool} priority = - withConnectionPool $ if priority then dbPriorityPool else dbPool +withConnectionPriority DBStore {dbPriorityPool, dbPool, dbConnect} priority = + withConnectionPool (if priority then dbPriorityPool else dbPool) dbConnect {-# INLINE withConnectionPriority #-} -withConnectionPool :: DBStorePool -> (PSQL.Connection -> IO a) -> IO a -withConnectionPool DBStorePool {dbPoolConns, dbSem} = - bracket - (withMVar dbSem $ \_ -> atomically $ readTBQueue dbPoolConns) - (atomically . writeTBQueue dbPoolConns) +withConnectionPool :: DBStorePool -> IO PSQL.Connection -> (PSQL.Connection -> IO a) -> IO a +withConnectionPool DBStorePool {dbPoolConns, dbSem} dbConnect action = + E.mask $ \restore -> do + conn <- withMVar dbSem $ \_ -> atomically $ readTBQueue dbPoolConns + r <- restore (action conn) `E.onException` reset conn + atomically $ writeTBQueue dbPoolConns conn + pure r + where + reset conn = do + conn' <- E.try dbConnect >>= \case + Right conn' -> PSQL.close conn >> pure conn' + Left (_ :: E.SomeException) -> pure conn + atomically $ writeTBQueue dbPoolConns conn' withConnection :: DBStore -> (PSQL.Connection -> IO a) -> IO a withConnection st = withConnectionPriority st False