diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index f834927d8..198fabdba 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1994,7 +1994,7 @@ subscriber :: AgentClient -> AM' () subscriber c@AgentClient {subQ, msgQ} = forever $ do t <- atomically $ readTBQueue msgQ agentOperationBracket c AORcvNetwork waitUntilActive $ - runExceptT (processSMPTransmission c t) >>= \case + tryAgentError' (processSMPTransmission c t) >>= \case Left e -> do logError $ tshow e atomically $ writeTBQueue subQ ("", "", APC SAEConn $ ERR e) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index d9b059d59..4c359a519 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -172,6 +172,7 @@ import Data.Text.Encoding import Data.Time (UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Data.Word (Word16) +import qualified Database.SQLite.Simple as SQL import Network.Socket (HostName) import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError) import qualified Simplex.FileTransfer.Client as X @@ -1621,10 +1622,16 @@ withStore :: AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> AM a withStore c action = do st <- asks store withExceptT storeError . ExceptT . liftIO . agentOperationBracket c AODatabase (\_ -> pure ()) $ - withTransaction st action `E.catch` handleInternal "" + withTransaction st action `E.catches` handleDBErrors where - handleInternal :: String -> E.SomeException -> IO (Either StoreError a) - handleInternal ctxStr e = pure . Left . SEInternal . B.pack $ show e <> ctxStr + handleDBErrors :: [E.Handler IO (Either StoreError a)] + handleDBErrors = + [ E.Handler $ \(e :: SQL.SQLError) -> + let se = SQL.sqlError e + busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked + in pure . Left . (if busy then SEDatabaseBusy else SEInternal) $ bshow se, + E.Handler $ \(E.SomeException e) -> pure . Left $ SEInternal $ bshow e + ] withStoreBatch :: Traversable t => AgentClient -> (DB.Connection -> t (IO (Either AgentErrorType a))) -> AM' (t (Either AgentErrorType a)) withStoreBatch c actions = do @@ -1652,6 +1659,7 @@ storeError = \case -- it is used to wrap agent operations when "transaction-like" store access is needed -- NOTE: network IO should NOT be used inside AgentStoreMonad SEAgentError e -> e + SEDatabaseBusy e -> CRITICAL True $ B.unpack e e -> INTERNAL $ show e incStat :: AgentClient -> Int -> AgentStatsKey -> STM () diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index ce76d5c89..b3decd8f0 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -30,7 +30,7 @@ import Data.Type.Equality import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.RetryInterval (RI2State) import qualified Simplex.Messaging.Crypto as C -import Simplex.Messaging.Crypto.Ratchet (RatchetX448, PQEncryption, PQSupport) +import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport, RatchetX448) import Simplex.Messaging.Encoding.String import Simplex.Messaging.Protocol ( MsgBody, @@ -593,6 +593,8 @@ type AsyncCmdId = Int64 data StoreError = -- | IO exceptions in store actions. SEInternal ByteString + | -- | Database busy + SEDatabaseBusy ByteString | -- | Failed to generate unique random ID SEUniqueID | -- | User ID not found diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index b8b1c7c52..beac334fb 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -221,6 +221,7 @@ module Simplex.Messaging.Agent.Store.SQLite ) where +import Control.Logger.Simple import Control.Monad import Control.Monad.Except import Control.Monad.IO.Class @@ -268,7 +269,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations (DownMigration (..), MTRE import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..)) -import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys, PQEncryption (..), PQSupport (..)) +import Simplex.Messaging.Crypto.Ratchet (PQEncryption (..), PQSupport (..), RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys) import qualified Simplex.Messaging.Crypto.Ratchet as CR import Simplex.Messaging.Encoding import Simplex.Messaging.Encoding.String @@ -278,7 +279,7 @@ import Simplex.Messaging.Parsers (blobFieldParser, defaultJSON, dropPrefix, from import Simplex.Messaging.Protocol import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Transport.Client (TransportHost) -import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, ifM, safeDecodeUtf8, ($>>=), (<$$>)) +import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, ifM, safeDecodeUtf8, tshow, ($>>=), (<$$>)) import Simplex.Messaging.Version.Internal import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist) import System.Exit (exitFailure) @@ -1272,12 +1273,16 @@ createCommand :: DB.Connection -> ACorrId -> ConnId -> Maybe SMPServer -> AgentC createCommand db corrId connId srv_ cmd = runExceptT $ do (host_, port_, serverKeyHash_) <- serverFields createdAt <- liftIO getCurrentTime - liftIO $ + liftIO . E.handle handleErr $ DB.execute db "INSERT INTO commands (host, port, corr_id, conn_id, command_tag, command, server_key_hash, created_at) VALUES (?,?,?,?,?,?,?,?)" - (host_, port_, corrId, connId, agentCommandTag cmd, cmd, serverKeyHash_, createdAt) + (host_, port_, corrId, connId, cmdTag, cmd, serverKeyHash_, createdAt) where + cmdTag = agentCommandTag cmd + handleErr e + | SQL.sqlError e == SQL.ErrorConstraint = logError $ "tried to create command " <> tshow cmdTag <> " for deleted connection" + | otherwise = E.throwIO e serverFields :: ExceptT StoreError IO (Maybe (NonEmpty TransportHost), Maybe ServiceName, Maybe C.KeyHash) serverFields = case srv_ of Just srv@(SMPServer host port _) ->