mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-15 03:05:08 +00:00
agent: do not throw exception when command is created for deleted connection (#1150)
* agent: do not throw exception when command is created for deleted connection * convert database busy/locked to critical alert
This commit is contained in:
committed by
GitHub
parent
91cc48aabe
commit
4455b8bd0e
@@ -1994,7 +1994,7 @@ subscriber :: AgentClient -> AM' ()
|
||||
subscriber c@AgentClient {subQ, msgQ} = forever $ do
|
||||
t <- atomically $ readTBQueue msgQ
|
||||
agentOperationBracket c AORcvNetwork waitUntilActive $
|
||||
runExceptT (processSMPTransmission c t) >>= \case
|
||||
tryAgentError' (processSMPTransmission c t) >>= \case
|
||||
Left e -> do
|
||||
logError $ tshow e
|
||||
atomically $ writeTBQueue subQ ("", "", APC SAEConn $ ERR e)
|
||||
|
||||
@@ -172,6 +172,7 @@ import Data.Text.Encoding
|
||||
import Data.Time (UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime)
|
||||
import Data.Time.Clock.System (getSystemTime)
|
||||
import Data.Word (Word16)
|
||||
import qualified Database.SQLite.Simple as SQL
|
||||
import Network.Socket (HostName)
|
||||
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
|
||||
import qualified Simplex.FileTransfer.Client as X
|
||||
@@ -1621,10 +1622,16 @@ withStore :: AgentClient -> (DB.Connection -> IO (Either StoreError a)) -> AM a
|
||||
withStore c action = do
|
||||
st <- asks store
|
||||
withExceptT storeError . ExceptT . liftIO . agentOperationBracket c AODatabase (\_ -> pure ()) $
|
||||
withTransaction st action `E.catch` handleInternal ""
|
||||
withTransaction st action `E.catches` handleDBErrors
|
||||
where
|
||||
handleInternal :: String -> E.SomeException -> IO (Either StoreError a)
|
||||
handleInternal ctxStr e = pure . Left . SEInternal . B.pack $ show e <> ctxStr
|
||||
handleDBErrors :: [E.Handler IO (Either StoreError a)]
|
||||
handleDBErrors =
|
||||
[ E.Handler $ \(e :: SQL.SQLError) ->
|
||||
let se = SQL.sqlError e
|
||||
busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked
|
||||
in pure . Left . (if busy then SEDatabaseBusy else SEInternal) $ bshow se,
|
||||
E.Handler $ \(E.SomeException e) -> pure . Left $ SEInternal $ bshow e
|
||||
]
|
||||
|
||||
withStoreBatch :: Traversable t => AgentClient -> (DB.Connection -> t (IO (Either AgentErrorType a))) -> AM' (t (Either AgentErrorType a))
|
||||
withStoreBatch c actions = do
|
||||
@@ -1652,6 +1659,7 @@ storeError = \case
|
||||
-- it is used to wrap agent operations when "transaction-like" store access is needed
|
||||
-- NOTE: network IO should NOT be used inside AgentStoreMonad
|
||||
SEAgentError e -> e
|
||||
SEDatabaseBusy e -> CRITICAL True $ B.unpack e
|
||||
e -> INTERNAL $ show e
|
||||
|
||||
incStat :: AgentClient -> Int -> AgentStatsKey -> STM ()
|
||||
|
||||
@@ -30,7 +30,7 @@ import Data.Type.Equality
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval (RI2State)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.Ratchet (RatchetX448, PQEncryption, PQSupport)
|
||||
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport, RatchetX448)
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Protocol
|
||||
( MsgBody,
|
||||
@@ -593,6 +593,8 @@ type AsyncCmdId = Int64
|
||||
data StoreError
|
||||
= -- | IO exceptions in store actions.
|
||||
SEInternal ByteString
|
||||
| -- | Database busy
|
||||
SEDatabaseBusy ByteString
|
||||
| -- | Failed to generate unique random ID
|
||||
SEUniqueID
|
||||
| -- | User ID not found
|
||||
|
||||
@@ -221,6 +221,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Logger.Simple
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Class
|
||||
@@ -268,7 +269,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations (DownMigration (..), MTRE
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.File (CryptoFile (..), CryptoFileArgs (..))
|
||||
import Simplex.Messaging.Crypto.Ratchet (RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys, PQEncryption (..), PQSupport (..))
|
||||
import Simplex.Messaging.Crypto.Ratchet (PQEncryption (..), PQSupport (..), RatchetX448, SkippedMsgDiff (..), SkippedMsgKeys)
|
||||
import qualified Simplex.Messaging.Crypto.Ratchet as CR
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Encoding.String
|
||||
@@ -278,7 +279,7 @@ import Simplex.Messaging.Parsers (blobFieldParser, defaultJSON, dropPrefix, from
|
||||
import Simplex.Messaging.Protocol
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Transport.Client (TransportHost)
|
||||
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, ifM, safeDecodeUtf8, ($>>=), (<$$>))
|
||||
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, ifM, safeDecodeUtf8, tshow, ($>>=), (<$$>))
|
||||
import Simplex.Messaging.Version.Internal
|
||||
import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist)
|
||||
import System.Exit (exitFailure)
|
||||
@@ -1272,12 +1273,16 @@ createCommand :: DB.Connection -> ACorrId -> ConnId -> Maybe SMPServer -> AgentC
|
||||
createCommand db corrId connId srv_ cmd = runExceptT $ do
|
||||
(host_, port_, serverKeyHash_) <- serverFields
|
||||
createdAt <- liftIO getCurrentTime
|
||||
liftIO $
|
||||
liftIO . E.handle handleErr $
|
||||
DB.execute
|
||||
db
|
||||
"INSERT INTO commands (host, port, corr_id, conn_id, command_tag, command, server_key_hash, created_at) VALUES (?,?,?,?,?,?,?,?)"
|
||||
(host_, port_, corrId, connId, agentCommandTag cmd, cmd, serverKeyHash_, createdAt)
|
||||
(host_, port_, corrId, connId, cmdTag, cmd, serverKeyHash_, createdAt)
|
||||
where
|
||||
cmdTag = agentCommandTag cmd
|
||||
handleErr e
|
||||
| SQL.sqlError e == SQL.ErrorConstraint = logError $ "tried to create command " <> tshow cmdTag <> " for deleted connection"
|
||||
| otherwise = E.throwIO e
|
||||
serverFields :: ExceptT StoreError IO (Maybe (NonEmpty TransportHost), Maybe ServiceName, Maybe C.KeyHash)
|
||||
serverFields = case srv_ of
|
||||
Just srv@(SMPServer host port _) ->
|
||||
|
||||
Reference in New Issue
Block a user