mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-14 16:15:12 +00:00
agent: mark work items failed (#931)
* agent: mark work items failed (WIP) * add tests, created_at * getWorkItem for snd and rcv files * fix * tests * fix * tests * test * tests * rename * fix,refactor * add indexes * update schema * do not try to get more work when resuming an existing worker --------- Co-authored-by: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
1e15d56e92
commit
7ddeca50e4
@@ -99,6 +99,7 @@ library
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230814_indexes
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at
|
||||
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items
|
||||
Simplex.Messaging.Agent.TAsyncs
|
||||
Simplex.Messaging.Agent.TRcvQueues
|
||||
Simplex.Messaging.Client
|
||||
|
||||
@@ -908,10 +908,10 @@ sendMessagesB c reqs = withConnLocks c connIds "sendMessages" $ do
|
||||
enqueueCommand :: AgentMonad m => AgentClient -> ACorrId -> ConnId -> Maybe SMPServer -> AgentCommand -> m ()
|
||||
enqueueCommand c corrId connId server aCommand = do
|
||||
withStore c $ \db -> createCommand db corrId connId server aCommand
|
||||
resumeSrvCmds c server
|
||||
void $ getAsyncCmdWorker (\w -> hasWorkToDo w $> w) c server
|
||||
|
||||
resumeSrvCmds :: forall m. AgentMonad' m => AgentClient -> Maybe SMPServer -> m ()
|
||||
resumeSrvCmds = void .: getAsyncCmdWorker
|
||||
resumeSrvCmds = void .: getAsyncCmdWorker pure
|
||||
|
||||
resumeConnCmds :: forall m. AgentMonad m => AgentClient -> ConnId -> m ()
|
||||
resumeConnCmds c connId =
|
||||
@@ -921,12 +921,11 @@ resumeConnCmds c connId =
|
||||
where
|
||||
connQueued = atomically $ isJust <$> TM.lookupInsert connId True (connCmdsQueued c)
|
||||
|
||||
getAsyncCmdWorker :: AgentMonad' m => AgentClient -> Maybe SMPServer -> m Worker
|
||||
getAsyncCmdWorker c server =
|
||||
atomically (getWorker >>= maybe createWorker resumeWorker) >>= \w -> runWorker w $> w
|
||||
getAsyncCmdWorker :: AgentMonad' m => (Worker -> STM Worker) -> AgentClient -> Maybe SMPServer -> m Worker
|
||||
getAsyncCmdWorker whenExists c server =
|
||||
atomically (getWorker >>= maybe createWorker whenExists) >>= \w -> runWorker w $> w
|
||||
where
|
||||
getWorker = TM.lookup server $ asyncCmdWorkers c
|
||||
resumeWorker w = hasWorkToDo w $> w
|
||||
deleteWorker wId = mapM_ $ \w -> when (wId == workerId w) $ TM.delete server $ asyncCmdWorkers c
|
||||
createWorker = do
|
||||
w <- newWorker c
|
||||
@@ -945,7 +944,7 @@ runCommandProcessing c@AgentClient {subQ} server_ doWork = do
|
||||
waitForWork doWork
|
||||
atomically $ throwWhenInactive c
|
||||
atomically $ beginAgentOperation c AOSndNetwork
|
||||
withWork c doWork (\db -> getPendingServerCommand db server_) $ processCmd (riFast ri)
|
||||
withWork c doWork (`getPendingServerCommand` server_) $ processCmd (riFast ri)
|
||||
where
|
||||
processCmd :: RetryInterval -> PendingCommand -> m ()
|
||||
processCmd ri PendingCommand {cmdId, corrId, userId, connId, command} = case command of
|
||||
@@ -1127,15 +1126,14 @@ enqueueSavedMessageB c reqs = do
|
||||
in map (\sq -> createSndMsgDelivery db connId sq mId) sqs
|
||||
|
||||
resumeMsgDelivery :: forall m. AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m ()
|
||||
resumeMsgDelivery = void .:. getDeliveryWorker
|
||||
resumeMsgDelivery = void .:. getDeliveryWorker pure
|
||||
|
||||
getDeliveryWorker :: AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m (Worker, TMVar ())
|
||||
getDeliveryWorker c cData sq = do
|
||||
atomically (getWorker >>= maybe createWorker resumeWorker) >>= \wl -> runWorker wl $> wl
|
||||
getDeliveryWorker :: AgentMonad' m => ((Worker, TMVar ()) -> STM (Worker, TMVar ())) -> AgentClient -> ConnData -> SndQueue -> m (Worker, TMVar ())
|
||||
getDeliveryWorker whenExists c cData sq = do
|
||||
atomically (getWorker >>= maybe createWorker whenExists) >>= \wl -> runWorker wl $> wl
|
||||
where
|
||||
qAddr = qAddress sq
|
||||
getWorker = TM.lookup qAddr $ smpDeliveryWorkers c
|
||||
resumeWorker wl = hasWorkToDo (fst wl) $> wl
|
||||
deleteWorker wId = mapM_ $ \(w, _) -> when (wId == workerId w) $ TM.delete qAddr $ smpDeliveryWorkers c
|
||||
createWorker = do
|
||||
retryLock <- newEmptyTMVar
|
||||
@@ -1150,7 +1148,7 @@ getDeliveryWorker c cData sq = do
|
||||
submitPendingMsg :: AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m ()
|
||||
submitPendingMsg c cData sq = do
|
||||
atomically $ modifyTVar' (msgDeliveryOp c) $ \s -> s {opsInProgress = opsInProgress s + 1}
|
||||
resumeMsgDelivery c cData sq
|
||||
void $ getDeliveryWorker (\wl -> hasWorkToDo (fst wl) $> wl) c cData sq
|
||||
|
||||
runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> TMVar () -> TMVar () -> m ()
|
||||
runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, duplexHandshake} sq doWork qLock = do
|
||||
@@ -2189,7 +2187,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s
|
||||
case findQ addr sqs of
|
||||
Just sq -> do
|
||||
logServer "<--" c srv rId "MSG <QCONT>"
|
||||
atomically $
|
||||
atomically $
|
||||
TM.lookup (qAddress sq) (smpDeliveryWorkers c)
|
||||
>>= mapM_ (\(_, retryLock) -> tryPutTMVar retryLock ())
|
||||
Nothing -> qError "QCONT: queue address not found"
|
||||
|
||||
@@ -139,10 +139,9 @@ import Data.Functor (($>))
|
||||
import Data.List (deleteFirstsBy, foldl', partition, (\\))
|
||||
import Data.List.NonEmpty (NonEmpty (..), (<|))
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import Data.Maybe (isNothing)
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (listToMaybe)
|
||||
import Data.Maybe (isNothing, listToMaybe)
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import Data.Text (Text)
|
||||
@@ -1241,15 +1240,18 @@ cryptoError = \case
|
||||
waitForWork :: AgentMonad' m => TMVar () -> m ()
|
||||
waitForWork = void . atomically . readTMVar
|
||||
|
||||
withWork :: AgentMonad m => AgentClient -> TMVar () -> (DB.Connection -> IO (Maybe a)) -> (a -> m ()) -> m ()
|
||||
withWork c doWork getWork action = do
|
||||
r <- withStore' c $ \db -> do
|
||||
r' <- getWork db
|
||||
liftIO $ when (isNothing r') $ noWorkToDo doWork
|
||||
pure r'
|
||||
mapM_ action r
|
||||
withWork :: AgentMonad m => AgentClient -> TMVar () -> (DB.Connection -> IO (Either StoreError (Maybe a))) -> (a -> m ()) -> m ()
|
||||
withWork c doWork getWork action =
|
||||
withStore' c getWork >>= \case
|
||||
Right (Just r) -> action r
|
||||
Right Nothing -> noWork
|
||||
Left e@SEWorkItemError {} -> noWork >> notifyErr e
|
||||
Left e -> notifyErr e
|
||||
where
|
||||
noWork = liftIO $ noWorkToDo doWork
|
||||
notifyErr e = atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ INTERNAL $ show e)
|
||||
|
||||
noWorkToDo :: TMVar () -> IO ()
|
||||
noWorkToDo :: TMVar () -> IO ()
|
||||
noWorkToDo = void . atomically . tryTakeTMVar
|
||||
|
||||
hasWorkToDo :: Worker -> STM ()
|
||||
@@ -1356,14 +1358,15 @@ withStoreCtx_ ctx_ c action = do
|
||||
withStoreBatch :: (AgentMonad' m, Traversable t) => AgentClient -> (DB.Connection -> t (IO (Either AgentErrorType a))) -> m (t (Either AgentErrorType a))
|
||||
withStoreBatch c actions = do
|
||||
st <- asks store
|
||||
liftIO $ agentOperationBracket c AODatabase (\_ -> pure ()) $
|
||||
withTransaction st $ mapM (`E.catch` handleInternal) . actions
|
||||
liftIO . agentOperationBracket c AODatabase (\_ -> pure ()) $
|
||||
withTransaction st $
|
||||
mapM (`E.catch` handleInternal) . actions
|
||||
where
|
||||
handleInternal :: E.SomeException -> IO (Either AgentErrorType a)
|
||||
handleInternal = pure . Left . INTERNAL . show
|
||||
|
||||
withStoreBatch' :: (AgentMonad' m, Traversable t) => AgentClient -> (DB.Connection -> t (IO a)) -> m (t (Either AgentErrorType a))
|
||||
withStoreBatch' c actions = withStoreBatch c $ \db -> fmap Right <$> actions db
|
||||
withStoreBatch' c actions = withStoreBatch c (fmap (fmap Right) . actions)
|
||||
|
||||
storeError :: StoreError -> AgentErrorType
|
||||
storeError = \case
|
||||
|
||||
@@ -1703,8 +1703,8 @@ commandP binaryP =
|
||||
>>= \case
|
||||
ACmdTag SClient e cmd ->
|
||||
ACmd SClient e <$> case cmd of
|
||||
NEW_ -> s (NEW <$> strP_ <*> strP_ <*> strP)
|
||||
JOIN_ -> s (JOIN <$> strP_ <*> strP_ <*> strP_ <*> binaryP)
|
||||
NEW_ -> s (NEW <$> strP_ <*> strP_ <*> (strP <|> pure SMP.SMSubscribe))
|
||||
JOIN_ -> s (JOIN <$> strP_ <*> strP_ <*> (strP_ <|> pure SMP.SMSubscribe) <*> binaryP)
|
||||
LET_ -> s (LET <$> A.takeTill (== ' ') <* A.space <*> binaryP)
|
||||
ACPT_ -> s (ACPT <$> A.takeTill (== ' ') <* A.space <*> binaryP)
|
||||
RJCT_ -> s (RJCT <$> A.takeByteString)
|
||||
|
||||
@@ -645,4 +645,6 @@ data StoreError
|
||||
SEFileNotFound
|
||||
| -- | XFTP Deleted snd chunk replica not found.
|
||||
SEDeletedSndChunkReplicaNotFound
|
||||
| -- | Error when reading work item that suspends worker - do not use!
|
||||
SEWorkItemError ByteString
|
||||
deriving (Eq, Show, Exception)
|
||||
|
||||
@@ -152,7 +152,9 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
setNullNtfSubscriptionAction,
|
||||
deleteNtfSubscription,
|
||||
getNextNtfSubNTFAction,
|
||||
markNtfSubActionNtfFailed_, -- exported for tests
|
||||
getNextNtfSubSMPAction,
|
||||
markNtfSubActionSMPFailed_, -- exported for tests
|
||||
getActiveNtfToken,
|
||||
getNtfRcvQueue,
|
||||
setConnectionNtfs,
|
||||
@@ -191,6 +193,7 @@ module Simplex.Messaging.Agent.Store.SQLite
|
||||
deleteSndFile',
|
||||
getSndFileDeleted,
|
||||
createSndFileReplica,
|
||||
createSndFileReplica_, -- exported for tests
|
||||
getNextSndChunkToUpload,
|
||||
updateSndChunkReplicaDelay,
|
||||
addSndChunkReplicaRecipients,
|
||||
@@ -222,7 +225,7 @@ import Control.Monad.IO.Class
|
||||
import Crypto.Random (ChaChaDRG)
|
||||
import qualified Data.Aeson.TH as J
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.Bifunctor (second)
|
||||
import Data.Bifunctor (first, second)
|
||||
import Data.ByteArray (ScrubbedBytes)
|
||||
import qualified Data.ByteArray as BA
|
||||
import Data.ByteString (ByteString)
|
||||
@@ -271,7 +274,7 @@ import Simplex.Messaging.Parsers (blobFieldParser, defaultJSON, dropPrefix, from
|
||||
import Simplex.Messaging.Protocol
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import Simplex.Messaging.Transport.Client (TransportHost)
|
||||
import Simplex.Messaging.Util (bshow, eitherToMaybe, ifM, safeDecodeUtf8, ($>>=), (<$$>))
|
||||
import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, ifM, safeDecodeUtf8, ($>>=), (<$$>))
|
||||
import Simplex.Messaging.Version
|
||||
import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist)
|
||||
import System.Exit (exitFailure)
|
||||
@@ -975,29 +978,60 @@ updateSndMsgRcpt db connId sndMsgId MsgReceipt {agentMsgId, msgRcptStatus} =
|
||||
"UPDATE snd_messages SET rcpt_internal_id = ?, rcpt_status = ? WHERE conn_id = ? AND internal_snd_id = ?"
|
||||
(agentMsgId, msgRcptStatus, connId, sndMsgId)
|
||||
|
||||
getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Maybe (Maybe RcvQueue, PendingMsgData))
|
||||
getPendingQueueMsg db connId SndQueue {dbQueueId} = do
|
||||
rq_ <- L.head <$$> getRcvQueuesByConnId_ db connId
|
||||
(rq_,) <$$> maybeFirstRow pendingMsgData getMsgData_
|
||||
getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Either StoreError (Maybe (Maybe RcvQueue, PendingMsgData)))
|
||||
getPendingQueueMsg db connId SndQueue {dbQueueId} =
|
||||
getWorkItem "message" getMsgId getMsgData markMsgFailed
|
||||
where
|
||||
getMsgData_ =
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT m.internal_id, m.msg_type, m.msg_flags, m.msg_body, m.internal_ts, s.retry_int_slow, s.retry_int_fast
|
||||
FROM snd_message_deliveries d
|
||||
JOIN messages m ON m.conn_id = d.conn_id AND m.internal_id = d.internal_id
|
||||
JOIN snd_messages s ON s.conn_id = d.conn_id AND s.internal_id = d.internal_id
|
||||
WHERE d.conn_id = ? AND d.snd_queue_id = ?
|
||||
ORDER BY d.internal_id ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(connId, dbQueueId)
|
||||
pendingMsgData :: (InternalId, AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs, Maybe Int64, Maybe Int64) -> PendingMsgData
|
||||
pendingMsgData (msgId, msgType, msgFlags_, msgBody, internalTs, riSlow_, riFast_) =
|
||||
let msgFlags = fromMaybe SMP.noMsgFlags msgFlags_
|
||||
msgRetryState = RI2State <$> riSlow_ <*> riFast_
|
||||
in PendingMsgData {msgId, msgType, msgFlags, msgBody, msgRetryState, internalTs}
|
||||
getMsgId :: IO (Maybe InternalId)
|
||||
getMsgId =
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT internal_id
|
||||
FROM snd_message_deliveries d
|
||||
WHERE conn_id = ? AND snd_queue_id = ? AND failed = 0
|
||||
ORDER BY internal_id ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(connId, dbQueueId)
|
||||
getMsgData :: InternalId -> IO (Either StoreError (Maybe RcvQueue, PendingMsgData))
|
||||
getMsgData msgId = runExceptT $ do
|
||||
msg <- ExceptT $ firstRow pendingMsgData err getMsgData_
|
||||
rq_ <- liftIO $ L.head <$$> getRcvQueuesByConnId_ db connId
|
||||
pure (rq_, msg)
|
||||
where
|
||||
getMsgData_ =
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT m.msg_type, m.msg_flags, m.msg_body, m.internal_ts, s.retry_int_slow, s.retry_int_fast
|
||||
FROM messages m
|
||||
JOIN snd_messages s ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id
|
||||
WHERE m.conn_id = ? AND m.internal_id = ?
|
||||
|]
|
||||
(connId, msgId)
|
||||
err = SEInternal $ "msg delivery " <> bshow msgId <> " returned []"
|
||||
pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs, Maybe Int64, Maybe Int64) -> PendingMsgData
|
||||
pendingMsgData (msgType, msgFlags_, msgBody, internalTs, riSlow_, riFast_) =
|
||||
let msgFlags = fromMaybe SMP.noMsgFlags msgFlags_
|
||||
msgRetryState = RI2State <$> riSlow_ <*> riFast_
|
||||
in PendingMsgData {msgId, msgType, msgFlags, msgBody, msgRetryState, internalTs}
|
||||
markMsgFailed msgId = DB.execute db "UPDATE snd_message_deliveries SET failed = 1 WHERE conn_id = ? AND internal_id = ?" (connId, msgId)
|
||||
|
||||
getWorkItem :: Show i => ByteString -> IO (Maybe i) -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError (Maybe a))
|
||||
getWorkItem itemName getId getItem markFailed =
|
||||
runExceptT $ handleErr "getId" getId >>= mapM tryGetItem
|
||||
where
|
||||
tryGetItem itemId = ExceptT (getItem itemId) `catchStoreErrors` \e -> mark itemId >> throwError e
|
||||
mark itemId = handleErr ("markFailed ID " <> bshow itemId) $ markFailed itemId
|
||||
catchStoreErrors = catchAllErrors (SEInternal . bshow)
|
||||
-- Errors caught by this function will suspend worker as if there is no more work,
|
||||
handleErr :: ByteString -> IO a -> ExceptT StoreError IO a
|
||||
handleErr opName action = ExceptT $ first mkError <$> E.try action
|
||||
where
|
||||
mkError :: E.SomeException -> StoreError
|
||||
mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e
|
||||
|
||||
updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO ()
|
||||
updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} =
|
||||
@@ -1095,7 +1129,7 @@ deleteSndMsgDelivery db connId SndQueue {dbQueueId} msgId keepForReceipt = do
|
||||
|
||||
countPendingSndDeliveries_ :: DB.Connection -> ConnId -> InternalId -> IO Int
|
||||
countPendingSndDeliveries_ db connId msgId = do
|
||||
(Only cnt : _) <- DB.query db "SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ?" (connId, msgId)
|
||||
(Only cnt : _) <- DB.query db "SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ? AND failed = 0" (connId, msgId)
|
||||
pure cnt
|
||||
|
||||
deleteRcvMsgHashesExpired :: DB.Connection -> NominalDiffTime -> IO ()
|
||||
@@ -1204,11 +1238,12 @@ updateRatchet db connId rc skipped = do
|
||||
createCommand :: DB.Connection -> ACorrId -> ConnId -> Maybe SMPServer -> AgentCommand -> IO (Either StoreError ())
|
||||
createCommand db corrId connId srv_ cmd = runExceptT $ do
|
||||
(host_, port_, serverKeyHash_) <- serverFields
|
||||
liftIO $ do
|
||||
createdAt <- liftIO getCurrentTime
|
||||
liftIO $
|
||||
DB.execute
|
||||
db
|
||||
"INSERT INTO commands (host, port, corr_id, conn_id, command_tag, command, server_key_hash) VALUES (?,?,?,?,?,?,?)"
|
||||
(host_, port_, corrId, connId, agentCommandTag cmd, cmd, serverKeyHash_)
|
||||
"INSERT INTO commands (host, port, corr_id, conn_id, command_tag, command, server_key_hash, created_at) VALUES (?,?,?,?,?,?,?,?)"
|
||||
(host_, port_, corrId, connId, agentCommandTag cmd, cmd, serverKeyHash_, createdAt)
|
||||
where
|
||||
serverFields :: ExceptT StoreError IO (Maybe (NonEmpty TransportHost), Maybe ServiceName, Maybe C.KeyHash)
|
||||
serverFields = case srv_ of
|
||||
@@ -1235,33 +1270,47 @@ getPendingCommandServers db connId = do
|
||||
where
|
||||
smpServer (host, port, keyHash) = SMPServer <$> host <*> port <*> keyHash
|
||||
|
||||
getPendingServerCommand :: DB.Connection -> Maybe SMPServer -> IO (Maybe PendingCommand)
|
||||
getPendingServerCommand db srv_ = maybeFirstRow pendingCommand $ case srv_ of
|
||||
Nothing ->
|
||||
DB.query_
|
||||
db
|
||||
[sql|
|
||||
SELECT c.command_id, c.corr_id, cs.user_id, c.conn_id, c.command
|
||||
FROM commands c
|
||||
JOIN connections cs USING (conn_id)
|
||||
WHERE c.host IS NULL AND c.port IS NULL
|
||||
ORDER BY c.created_at ASC, c.command_id ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
Just (SMPServer host port _) ->
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT c.command_id, c.corr_id, cs.user_id, c.conn_id, c.command
|
||||
FROM commands c
|
||||
JOIN connections cs USING (conn_id)
|
||||
WHERE c.host = ? AND c.port = ?
|
||||
ORDER BY c.created_at ASC, c.command_id ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port)
|
||||
getPendingServerCommand :: DB.Connection -> Maybe SMPServer -> IO (Either StoreError (Maybe PendingCommand))
|
||||
getPendingServerCommand db srv_ = getWorkItem "command" getCmdId getCommand markCommandFailed
|
||||
where
|
||||
pendingCommand (cmdId, corrId, userId, connId, command) = PendingCommand {cmdId, corrId, userId, connId, command}
|
||||
getCmdId :: IO (Maybe Int64)
|
||||
getCmdId =
|
||||
maybeFirstRow fromOnly $ case srv_ of
|
||||
Nothing ->
|
||||
DB.query_
|
||||
db
|
||||
[sql|
|
||||
SELECT command_id FROM commands
|
||||
WHERE host IS NULL AND port IS NULL AND failed = 0
|
||||
ORDER BY created_at ASC, command_id ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
Just (SMPServer host port _) ->
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT command_id FROM commands
|
||||
WHERE host = ? AND port = ? AND failed = 0
|
||||
ORDER BY created_at ASC, command_id ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port)
|
||||
getCommand :: Int64 -> IO (Either StoreError PendingCommand)
|
||||
getCommand cmdId =
|
||||
firstRow pendingCommand err $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT c.corr_id, cs.user_id, c.conn_id, c.command
|
||||
FROM commands c
|
||||
JOIN connections cs USING (conn_id)
|
||||
WHERE c.command_id = ?
|
||||
|]
|
||||
(Only cmdId)
|
||||
where
|
||||
err = SEInternal $ "command " <> bshow cmdId <> " returned []"
|
||||
pendingCommand (corrId, userId, connId, command) = PendingCommand {cmdId, corrId, userId, connId, command}
|
||||
markCommandFailed cmdId = DB.execute db "UPDATE commands SET failed = 1 WHERE command_id = ?" (Only cmdId)
|
||||
|
||||
deleteCommand :: DB.Connection -> AsyncCmdId -> IO ()
|
||||
deleteCommand db cmdId =
|
||||
@@ -1486,53 +1535,95 @@ deleteNtfSubscription db connId = do
|
||||
(Nothing :: Maybe SMP.NotifierId, Nothing :: Maybe NtfSubscriptionId, NASDeleted, False, updatedAt, connId)
|
||||
else DB.execute db "DELETE FROM ntf_subscriptions WHERE conn_id = ?" (Only connId)
|
||||
|
||||
getNextNtfSubNTFAction :: DB.Connection -> NtfServer -> IO (Maybe (NtfSubscription, NtfSubNTFAction, NtfActionTs))
|
||||
getNextNtfSubNTFAction db ntfServer@(NtfServer ntfHost ntfPort _) = do
|
||||
maybeFirstRow ntfSubAction getNtfSubAction_ $>>= \a@(NtfSubscription {connId}, _, _) -> do
|
||||
DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId)
|
||||
pure $ Just a
|
||||
getNextNtfSubNTFAction :: DB.Connection -> NtfServer -> IO (Either StoreError (Maybe (NtfSubscription, NtfSubNTFAction, NtfActionTs)))
|
||||
getNextNtfSubNTFAction db ntfServer@(NtfServer ntfHost ntfPort _) =
|
||||
getWorkItem "ntf NTF" getNtfConnId getNtfSubAction (markNtfSubActionNtfFailed_ db)
|
||||
where
|
||||
getNtfSubAction_ =
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT ns.conn_id, s.host, s.port, COALESCE(ns.smp_server_key_hash, s.key_hash),
|
||||
ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_action
|
||||
FROM ntf_subscriptions ns
|
||||
JOIN servers s ON s.host = ns.smp_host AND s.port = ns.smp_port
|
||||
WHERE ns.ntf_host = ? AND ns.ntf_port = ? AND ns.ntf_sub_action IS NOT NULL
|
||||
ORDER BY ns.ntf_sub_action_ts ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(ntfHost, ntfPort)
|
||||
ntfSubAction (connId, smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) =
|
||||
let smpServer = SMPServer smpHost smpPort smpKeyHash
|
||||
ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}
|
||||
in (ntfSubscription, action, actionTs)
|
||||
getNtfConnId :: IO (Maybe ConnId)
|
||||
getNtfConnId =
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT conn_id
|
||||
FROM ntf_subscriptions
|
||||
WHERE ntf_host = ? AND ntf_port = ? AND ntf_sub_action IS NOT NULL
|
||||
AND (ntf_failed = 0 OR updated_by_supervisor = 1)
|
||||
ORDER BY ntf_sub_action_ts ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(ntfHost, ntfPort)
|
||||
getNtfSubAction :: ConnId -> IO (Either StoreError (NtfSubscription, NtfSubNTFAction, NtfActionTs))
|
||||
getNtfSubAction connId = do
|
||||
markUpdatedByWorker db connId
|
||||
firstRow ntfSubAction err $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT s.host, s.port, COALESCE(ns.smp_server_key_hash, s.key_hash),
|
||||
ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_action
|
||||
FROM ntf_subscriptions ns
|
||||
JOIN servers s ON s.host = ns.smp_host AND s.port = ns.smp_port
|
||||
WHERE ns.conn_id = ?
|
||||
|]
|
||||
(Only connId)
|
||||
where
|
||||
err = SEInternal $ "ntf subscription " <> bshow connId <> " returned []"
|
||||
ntfSubAction (smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) =
|
||||
let smpServer = SMPServer smpHost smpPort smpKeyHash
|
||||
ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}
|
||||
in (ntfSubscription, action, actionTs)
|
||||
|
||||
getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Maybe (NtfSubscription, NtfSubSMPAction, NtfActionTs))
|
||||
getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = do
|
||||
maybeFirstRow ntfSubAction getNtfSubAction_ $>>= \a@(NtfSubscription {connId}, _, _) -> do
|
||||
DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId)
|
||||
pure $ Just a
|
||||
markNtfSubActionNtfFailed_ :: DB.Connection -> ConnId -> IO ()
|
||||
markNtfSubActionNtfFailed_ db connId =
|
||||
DB.execute db "UPDATE ntf_subscriptions SET ntf_failed = 1 where conn_id = ?" (Only connId)
|
||||
|
||||
getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Either StoreError (Maybe (NtfSubscription, NtfSubSMPAction, NtfActionTs)))
|
||||
getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) =
|
||||
getWorkItem "ntf SMP" getNtfConnId getNtfSubAction (markNtfSubActionSMPFailed_ db)
|
||||
where
|
||||
getNtfSubAction_ =
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT ns.conn_id, s.ntf_host, s.ntf_port, s.ntf_key_hash,
|
||||
ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_smp_action
|
||||
FROM ntf_subscriptions ns
|
||||
JOIN ntf_servers s USING (ntf_host, ntf_port)
|
||||
WHERE ns.smp_host = ? AND ns.smp_port = ? AND ns.ntf_sub_smp_action IS NOT NULL AND ns.ntf_sub_action_ts IS NOT NULL
|
||||
ORDER BY ns.ntf_sub_action_ts ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(smpHost, smpPort)
|
||||
ntfSubAction (connId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) =
|
||||
let ntfServer = NtfServer ntfHost ntfPort ntfKeyHash
|
||||
ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}
|
||||
in (ntfSubscription, action, actionTs)
|
||||
getNtfConnId :: IO (Maybe ConnId)
|
||||
getNtfConnId =
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT conn_id
|
||||
FROM ntf_subscriptions ns
|
||||
WHERE smp_host = ? AND smp_port = ? AND ntf_sub_smp_action IS NOT NULL AND ntf_sub_action_ts IS NOT NULL
|
||||
AND (smp_failed = 0 OR updated_by_supervisor = 1)
|
||||
ORDER BY ntf_sub_action_ts ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(smpHost, smpPort)
|
||||
getNtfSubAction :: ConnId -> IO (Either StoreError (NtfSubscription, NtfSubSMPAction, NtfActionTs))
|
||||
getNtfSubAction connId = do
|
||||
markUpdatedByWorker db connId
|
||||
firstRow ntfSubAction err $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT s.ntf_host, s.ntf_port, s.ntf_key_hash,
|
||||
ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_smp_action
|
||||
FROM ntf_subscriptions ns
|
||||
JOIN ntf_servers s USING (ntf_host, ntf_port)
|
||||
WHERE ns.conn_id = ?
|
||||
|]
|
||||
(Only connId)
|
||||
where
|
||||
err = SEInternal $ "ntf subscription " <> bshow connId <> " returned []"
|
||||
ntfSubAction (ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) =
|
||||
let ntfServer = NtfServer ntfHost ntfPort ntfKeyHash
|
||||
ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus}
|
||||
in (ntfSubscription, action, actionTs)
|
||||
|
||||
markNtfSubActionSMPFailed_ :: DB.Connection -> ConnId -> IO ()
|
||||
markNtfSubActionSMPFailed_ db connId =
|
||||
DB.execute db "UPDATE ntf_subscriptions SET smp_failed = 1 where conn_id = ?" (Only connId)
|
||||
|
||||
markUpdatedByWorker :: DB.Connection -> ConnId -> IO ()
|
||||
markUpdatedByWorker db connId =
|
||||
DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = 0 WHERE conn_id = ?" (Only connId)
|
||||
|
||||
getActiveNtfToken :: DB.Connection -> IO (Maybe NtfToken)
|
||||
getActiveNtfToken db =
|
||||
@@ -2289,60 +2380,84 @@ deleteRcvFile' :: DB.Connection -> DBRcvFileId -> IO ()
|
||||
deleteRcvFile' db rcvFileId =
|
||||
DB.execute db "DELETE FROM rcv_files WHERE rcv_file_id = ?" (Only rcvFileId)
|
||||
|
||||
getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe RcvFileChunk)
|
||||
getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFileChunk))
|
||||
getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = do
|
||||
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
|
||||
maybeFirstRow toChunk $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT
|
||||
f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path,
|
||||
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries
|
||||
FROM rcv_file_chunk_replicas r
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id
|
||||
JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id
|
||||
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
|
||||
AND r.received = 0 AND r.replica_number = 1
|
||||
AND f.status = ? AND f.deleted = 0 AND f.created_at >= ?
|
||||
ORDER BY r.created_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, RFSReceiving, cutoffTs)
|
||||
getWorkItem "rcv_file_download" getReplicaId getChunkData (markRcvFileFailed db . snd)
|
||||
where
|
||||
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int64, Int)) -> RcvFileChunk
|
||||
toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries)) =
|
||||
RcvFileChunk
|
||||
{ rcvFileId,
|
||||
rcvFileEntityId,
|
||||
userId,
|
||||
rcvChunkId,
|
||||
chunkNo,
|
||||
chunkSize,
|
||||
digest,
|
||||
fileTmpPath,
|
||||
chunkTmpPath,
|
||||
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}]
|
||||
}
|
||||
getReplicaId :: IO (Maybe (Int64, DBRcvFileId))
|
||||
getReplicaId = do
|
||||
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
|
||||
maybeFirstRow id $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT r.rcv_file_chunk_replica_id, f.rcv_file_id
|
||||
FROM rcv_file_chunk_replicas r
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id
|
||||
JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id
|
||||
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
|
||||
AND r.received = 0 AND r.replica_number = 1
|
||||
AND f.status = ? AND f.deleted = 0 AND f.created_at >= ?
|
||||
AND f.failed = 0
|
||||
ORDER BY r.created_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, RFSReceiving, cutoffTs)
|
||||
getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError RcvFileChunk)
|
||||
getChunkData (rcvFileChunkReplicaId, _fileId) =
|
||||
firstRow toChunk SEFileNotFound $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT
|
||||
f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path,
|
||||
r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries
|
||||
FROM rcv_file_chunk_replicas r
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id
|
||||
JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id
|
||||
WHERE r.rcv_file_chunk_replica_id = ?
|
||||
|]
|
||||
(Only rcvFileChunkReplicaId)
|
||||
where
|
||||
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int64, Int)) -> RcvFileChunk
|
||||
toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries)) =
|
||||
RcvFileChunk
|
||||
{ rcvFileId,
|
||||
rcvFileEntityId,
|
||||
userId,
|
||||
rcvChunkId,
|
||||
chunkNo,
|
||||
chunkSize,
|
||||
digest,
|
||||
fileTmpPath,
|
||||
chunkTmpPath,
|
||||
replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}]
|
||||
}
|
||||
|
||||
getNextRcvFileToDecrypt :: DB.Connection -> NominalDiffTime -> IO (Maybe RcvFile)
|
||||
getNextRcvFileToDecrypt db ttl = do
|
||||
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
|
||||
fileId_ :: Maybe DBRcvFileId <-
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT rcv_file_id
|
||||
FROM rcv_files
|
||||
WHERE status IN (?,?) AND deleted = 0 AND created_at >= ?
|
||||
ORDER BY created_at ASC LIMIT 1
|
||||
|]
|
||||
(RFSReceived, RFSDecrypting, cutoffTs)
|
||||
case fileId_ of
|
||||
Nothing -> pure Nothing
|
||||
Just fileId -> eitherToMaybe <$> getRcvFile db fileId
|
||||
getNextRcvFileToDecrypt :: DB.Connection -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFile))
|
||||
getNextRcvFileToDecrypt db ttl =
|
||||
getWorkItem "rcv_file_decrypt" getFileId (getRcvFile db) (markRcvFileFailed db)
|
||||
where
|
||||
getFileId :: IO (Maybe DBRcvFileId)
|
||||
getFileId = do
|
||||
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT rcv_file_id
|
||||
FROM rcv_files
|
||||
WHERE status IN (?,?) AND deleted = 0 AND created_at >= ?
|
||||
AND failed = 0
|
||||
ORDER BY created_at ASC LIMIT 1
|
||||
|]
|
||||
(RFSReceived, RFSDecrypting, cutoffTs)
|
||||
|
||||
markRcvFileFailed :: DB.Connection -> DBRcvFileId -> IO ()
|
||||
markRcvFileFailed db fileId = do
|
||||
DB.execute db "UPDATE rcv_files SET failed = 1 WHERE rcv_file_id = ?" (Only fileId)
|
||||
|
||||
getPendingRcvFilesServers :: DB.Connection -> NominalDiffTime -> IO [XFTPServer]
|
||||
getPendingRcvFilesServers db ttl = do
|
||||
@@ -2494,23 +2609,28 @@ getChunkReplicaRecipients_ db replicaId =
|
||||
|]
|
||||
(Only replicaId)
|
||||
|
||||
getNextSndFileToPrepare :: DB.Connection -> NominalDiffTime -> IO (Maybe SndFile)
|
||||
getNextSndFileToPrepare db ttl = do
|
||||
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
|
||||
fileId_ :: Maybe DBSndFileId <-
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT snd_file_id
|
||||
FROM snd_files
|
||||
WHERE status IN (?,?,?) AND deleted = 0 AND created_at >= ?
|
||||
ORDER BY created_at ASC LIMIT 1
|
||||
|]
|
||||
(SFSNew, SFSEncrypting, SFSEncrypted, cutoffTs)
|
||||
case fileId_ of
|
||||
Nothing -> pure Nothing
|
||||
Just fileId -> eitherToMaybe <$> getSndFile db fileId
|
||||
getNextSndFileToPrepare :: DB.Connection -> NominalDiffTime -> IO (Either StoreError (Maybe SndFile))
|
||||
getNextSndFileToPrepare db ttl =
|
||||
getWorkItem "snd_file_prepare" getFileId (getSndFile db) (markSndFileFailed db)
|
||||
where
|
||||
getFileId :: IO (Maybe DBSndFileId)
|
||||
getFileId = do
|
||||
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT snd_file_id
|
||||
FROM snd_files
|
||||
WHERE status IN (?,?,?) AND deleted = 0 AND created_at >= ?
|
||||
AND failed = 0
|
||||
ORDER BY created_at ASC LIMIT 1
|
||||
|]
|
||||
(SFSNew, SFSEncrypting, SFSEncrypted, cutoffTs)
|
||||
|
||||
markSndFileFailed :: DB.Connection -> DBSndFileId -> IO ()
|
||||
markSndFileFailed db fileId =
|
||||
DB.execute db "UPDATE snd_files SET failed = 1 WHERE snd_file_id = ?" (Only fileId)
|
||||
|
||||
updateSndFileError :: DB.Connection -> DBSndFileId -> String -> IO ()
|
||||
updateSndFileError db sndFileId errStr = do
|
||||
@@ -2554,7 +2674,10 @@ getSndFileDeleted db sndFileId =
|
||||
<$> maybeFirstRow fromOnly (DB.query db "SELECT deleted FROM snd_files WHERE snd_file_id = ?" (Only sndFileId))
|
||||
|
||||
createSndFileReplica :: DB.Connection -> SndFileChunk -> NewSndChunkReplica -> IO ()
|
||||
createSndFileReplica db SndFileChunk {sndChunkId} NewSndChunkReplica {server, replicaId, replicaKey, rcvIdsKeys} = do
|
||||
createSndFileReplica db SndFileChunk {sndChunkId} = createSndFileReplica_ db sndChunkId
|
||||
|
||||
createSndFileReplica_ :: DB.Connection -> Int64 -> NewSndChunkReplica -> IO ()
|
||||
createSndFileReplica_ db sndChunkId NewSndChunkReplica {server, replicaId, replicaKey, rcvIdsKeys} = do
|
||||
srvId <- createXFTPServer_ db server
|
||||
DB.execute
|
||||
db
|
||||
@@ -2575,50 +2698,69 @@ createSndFileReplica db SndFileChunk {sndChunkId} NewSndChunkReplica {server, re
|
||||
|]
|
||||
(rId, rcvId, rcvKey)
|
||||
|
||||
getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe SndFileChunk)
|
||||
getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe SndFileChunk))
|
||||
getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} ttl = do
|
||||
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
|
||||
chunk_ <-
|
||||
maybeFirstRow toChunk $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT
|
||||
f.snd_file_id, f.snd_file_entity_id, f.user_id, f.num_recipients, f.prefix_path,
|
||||
c.snd_file_chunk_id, c.chunk_no, c.chunk_offset, c.chunk_size, c.digest,
|
||||
r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries
|
||||
FROM snd_file_chunk_replicas r
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id
|
||||
JOIN snd_files f ON f.snd_file_id = c.snd_file_id
|
||||
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
|
||||
AND r.replica_status = ? AND r.replica_number = 1
|
||||
AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ?
|
||||
ORDER BY r.created_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs)
|
||||
forM chunk_ $ \chunk@SndFileChunk {replicas} -> do
|
||||
replicas' <- forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do
|
||||
rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId
|
||||
pure (replica :: SndFileChunkReplica) {rcvIdsKeys}
|
||||
pure (chunk {replicas = replicas'} :: SndFileChunk)
|
||||
getWorkItem "snd_file_upload" getReplicaId getChunkData (markSndFileFailed db . snd)
|
||||
where
|
||||
toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk
|
||||
toChunk ((sndFileId, sndFileEntityId, userId, numRecipients, filePrefixPath) :. (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) :. (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries)) =
|
||||
let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize}
|
||||
in SndFileChunk
|
||||
{ sndFileId,
|
||||
sndFileEntityId,
|
||||
userId,
|
||||
numRecipients,
|
||||
sndChunkId,
|
||||
chunkNo,
|
||||
chunkSpec,
|
||||
digest,
|
||||
filePrefixPath,
|
||||
replicas = [SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []}]
|
||||
}
|
||||
getReplicaId :: IO (Maybe (Int64, DBSndFileId))
|
||||
getReplicaId = do
|
||||
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
|
||||
maybeFirstRow id $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT r.snd_file_chunk_replica_id, f.snd_file_id
|
||||
FROM snd_file_chunk_replicas r
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id
|
||||
JOIN snd_files f ON f.snd_file_id = c.snd_file_id
|
||||
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
|
||||
AND r.replica_status = ? AND r.replica_number = 1
|
||||
AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ?
|
||||
AND f.failed = 0
|
||||
ORDER BY r.created_at ASC
|
||||
LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs)
|
||||
getChunkData :: (Int64, DBSndFileId) -> IO (Either StoreError SndFileChunk)
|
||||
getChunkData (sndFileChunkReplicaId, _fileId) = do
|
||||
chunk_ <-
|
||||
firstRow toChunk SEFileNotFound $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT
|
||||
f.snd_file_id, f.snd_file_entity_id, f.user_id, f.num_recipients, f.prefix_path,
|
||||
c.snd_file_chunk_id, c.chunk_no, c.chunk_offset, c.chunk_size, c.digest,
|
||||
r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries
|
||||
FROM snd_file_chunk_replicas r
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id
|
||||
JOIN snd_files f ON f.snd_file_id = c.snd_file_id
|
||||
WHERE r.snd_file_chunk_replica_id = ?
|
||||
|]
|
||||
(Only sndFileChunkReplicaId)
|
||||
forM chunk_ $ \chunk@SndFileChunk {replicas} -> do
|
||||
replicas' <- forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do
|
||||
rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId
|
||||
pure (replica :: SndFileChunkReplica) {rcvIdsKeys}
|
||||
pure (chunk {replicas = replicas'} :: SndFileChunk)
|
||||
where
|
||||
toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk
|
||||
toChunk ((sndFileId, sndFileEntityId, userId, numRecipients, filePrefixPath) :. (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) :. (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries)) =
|
||||
let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize}
|
||||
in SndFileChunk
|
||||
{ sndFileId,
|
||||
sndFileEntityId,
|
||||
userId,
|
||||
numRecipients,
|
||||
sndChunkId,
|
||||
chunkNo,
|
||||
chunkSpec,
|
||||
digest,
|
||||
filePrefixPath,
|
||||
replicas = [SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []}]
|
||||
}
|
||||
|
||||
updateSndChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO ()
|
||||
updateSndChunkReplicaDelay db replicaId delay = do
|
||||
@@ -2723,25 +2865,29 @@ getDeletedSndChunkReplica db deletedSndChunkReplicaId =
|
||||
let server = XFTPServer host port keyHash
|
||||
in DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, replicaId, replicaKey, chunkDigest, delay, retries}
|
||||
|
||||
getNextDeletedSndChunkReplica :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe DeletedSndChunkReplica)
|
||||
getNextDeletedSndChunkReplica db ProtocolServer {host, port, keyHash} ttl = do
|
||||
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
|
||||
replicaId_ :: Maybe Int64 <-
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT r.deleted_snd_chunk_replica_id
|
||||
FROM deleted_snd_chunk_replicas r
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
|
||||
AND r.created_at >= ?
|
||||
ORDER BY r.created_at ASC LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, cutoffTs)
|
||||
case replicaId_ of
|
||||
Nothing -> pure Nothing
|
||||
Just replicaId -> eitherToMaybe <$> getDeletedSndChunkReplica db replicaId
|
||||
getNextDeletedSndChunkReplica :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe DeletedSndChunkReplica))
|
||||
getNextDeletedSndChunkReplica db ProtocolServer {host, port, keyHash} ttl =
|
||||
getWorkItem "deleted replica" getReplicaId (getDeletedSndChunkReplica db) markReplicaFailed
|
||||
where
|
||||
getReplicaId :: IO (Maybe Int64)
|
||||
getReplicaId = do
|
||||
cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT r.deleted_snd_chunk_replica_id
|
||||
FROM deleted_snd_chunk_replicas r
|
||||
JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id
|
||||
WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ?
|
||||
AND r.created_at >= ?
|
||||
AND failed = 0
|
||||
ORDER BY r.created_at ASC LIMIT 1
|
||||
|]
|
||||
(host, port, keyHash, cutoffTs)
|
||||
markReplicaFailed :: Int64 -> IO ()
|
||||
markReplicaFailed replicaId = do
|
||||
DB.execute db "UPDATE deleted_snd_chunk_replicas SET failed = 1 WHERE deleted_snd_chunk_replica_id = ?" (Only replicaId)
|
||||
|
||||
updateDeletedSndChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO ()
|
||||
updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId delay = do
|
||||
|
||||
@@ -66,6 +66,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230722_indexes
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230814_indexes
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at
|
||||
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items
|
||||
import Simplex.Messaging.Encoding.String
|
||||
import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON)
|
||||
import Simplex.Messaging.Transport.Client (TransportHost)
|
||||
@@ -100,7 +101,8 @@ schemaMigrations =
|
||||
("m20230722_indexes", m20230722_indexes, Just down_m20230722_indexes),
|
||||
("m20230814_indexes", m20230814_indexes, Just down_m20230814_indexes),
|
||||
("m20230829_crypto_files", m20230829_crypto_files, Just down_m20230829_crypto_files),
|
||||
("m20231222_command_created_at", m20231222_command_created_at, Just down_m20231222_command_created_at)
|
||||
("m20231222_command_created_at", m20231222_command_created_at, Just down_m20231222_command_created_at),
|
||||
("m20231225_failed_work_items", m20231225_failed_work_items, Just down_m20231225_failed_work_items)
|
||||
]
|
||||
|
||||
-- | The list of migrations in ascending order by date
|
||||
|
||||
@@ -0,0 +1,38 @@
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items where
|
||||
|
||||
import Database.SQLite.Simple (Query)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
|
||||
m20231225_failed_work_items :: Query
|
||||
m20231225_failed_work_items =
|
||||
[sql|
|
||||
ALTER TABLE snd_message_deliveries ADD COLUMN failed INTEGER DEFAULT 0;
|
||||
ALTER TABLE commands ADD COLUMN failed INTEGER DEFAULT 0;
|
||||
ALTER TABLE ntf_subscriptions ADD COLUMN ntf_failed INTEGER DEFAULT 0;
|
||||
ALTER TABLE ntf_subscriptions ADD COLUMN smp_failed INTEGER DEFAULT 0;
|
||||
ALTER TABLE rcv_files ADD COLUMN failed INTEGER DEFAULT 0;
|
||||
ALTER TABLE snd_files ADD COLUMN failed INTEGER DEFAULT 0;
|
||||
ALTER TABLE deleted_snd_chunk_replicas ADD COLUMN failed INTEGER DEFAULT 0;
|
||||
|
||||
CREATE INDEX idx_rcv_files_status_created_at ON rcv_files(status, created_at);
|
||||
CREATE INDEX idx_snd_files_status_created_at ON snd_files(status, created_at);
|
||||
CREATE INDEX idx_snd_files_snd_file_entity_id ON snd_files(snd_file_entity_id);
|
||||
|]
|
||||
|
||||
down_m20231225_failed_work_items :: Query
|
||||
down_m20231225_failed_work_items =
|
||||
[sql|
|
||||
DROP INDEX idx_rcv_files_status_created_at;
|
||||
DROP INDEX idx_snd_files_status_created_at;
|
||||
DROP INDEX idx_snd_files_snd_file_entity_id;
|
||||
|
||||
ALTER TABLE snd_message_deliveries DROP COLUMN failed;
|
||||
ALTER TABLE commands DROP COLUMN failed;
|
||||
ALTER TABLE ntf_subscriptions DROP COLUMN ntf_failed;
|
||||
ALTER TABLE ntf_subscriptions DROP COLUMN smp_failed;
|
||||
ALTER TABLE rcv_files DROP COLUMN failed;
|
||||
ALTER TABLE snd_files DROP COLUMN failed;
|
||||
ALTER TABLE deleted_snd_chunk_replicas DROP COLUMN failed;
|
||||
|]
|
||||
@@ -213,6 +213,8 @@ CREATE TABLE ntf_subscriptions(
|
||||
created_at TEXT NOT NULL DEFAULT(datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT(datetime('now')),
|
||||
smp_server_key_hash BLOB,
|
||||
ntf_failed INTEGER DEFAULT 0,
|
||||
smp_failed INTEGER DEFAULT 0,
|
||||
PRIMARY KEY(conn_id),
|
||||
FOREIGN KEY(smp_host, smp_port) REFERENCES servers(host, port)
|
||||
ON DELETE SET NULL ON UPDATE CASCADE,
|
||||
@@ -230,6 +232,7 @@ CREATE TABLE commands(
|
||||
agent_version INTEGER NOT NULL DEFAULT 1,
|
||||
server_key_hash BLOB,
|
||||
created_at TEXT NOT NULL DEFAULT('1970-01-01 00:00:00'),
|
||||
failed INTEGER DEFAULT 0,
|
||||
FOREIGN KEY(host, port) REFERENCES servers
|
||||
ON DELETE RESTRICT ON UPDATE CASCADE
|
||||
);
|
||||
@@ -238,6 +241,7 @@ CREATE TABLE snd_message_deliveries(
|
||||
conn_id BLOB NOT NULL REFERENCES connections ON DELETE CASCADE,
|
||||
snd_queue_id INTEGER NOT NULL,
|
||||
internal_id INTEGER NOT NULL,
|
||||
failed INTEGER DEFAULT 0,
|
||||
FOREIGN KEY(conn_id, internal_id) REFERENCES messages ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
|
||||
);
|
||||
CREATE TABLE sqlite_sequence(name,seq);
|
||||
@@ -274,6 +278,7 @@ CREATE TABLE rcv_files(
|
||||
updated_at TEXT NOT NULL DEFAULT(datetime('now')),
|
||||
save_file_key BLOB,
|
||||
save_file_nonce BLOB,
|
||||
failed INTEGER DEFAULT 0,
|
||||
UNIQUE(rcv_file_entity_id)
|
||||
);
|
||||
CREATE TABLE rcv_file_chunks(
|
||||
@@ -316,7 +321,8 @@ CREATE TABLE snd_files(
|
||||
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
|
||||
,
|
||||
src_file_key BLOB,
|
||||
src_file_nonce BLOB
|
||||
src_file_nonce BLOB,
|
||||
failed INTEGER DEFAULT 0
|
||||
);
|
||||
CREATE TABLE snd_file_chunks(
|
||||
snd_file_chunk_id INTEGER PRIMARY KEY,
|
||||
@@ -360,6 +366,8 @@ CREATE TABLE deleted_snd_chunk_replicas(
|
||||
retries INTEGER NOT NULL DEFAULT 0,
|
||||
created_at TEXT NOT NULL DEFAULT(datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
|
||||
,
|
||||
failed INTEGER DEFAULT 0
|
||||
);
|
||||
CREATE TABLE encrypted_rcv_message_hashes(
|
||||
encrypted_rcv_message_hash_id INTEGER PRIMARY KEY,
|
||||
@@ -486,3 +494,6 @@ CREATE INDEX idx_commands_server_commands ON commands(
|
||||
created_at,
|
||||
command_id
|
||||
);
|
||||
CREATE INDEX idx_rcv_files_status_created_at ON rcv_files(status, created_at);
|
||||
CREATE INDEX idx_snd_files_status_created_at ON snd_files(status, created_at);
|
||||
CREATE INDEX idx_snd_files_snd_file_entity_id ON snd_files(snd_file_entity_id);
|
||||
|
||||
+1
-1
@@ -44,7 +44,7 @@ agentTests (ATransport t) = do
|
||||
describe "SQLite store" storeTests
|
||||
describe "Migration tests" migrationTests
|
||||
describe "SMP agent protocol syntax" $ syntaxTests t
|
||||
describe "Establishing duplex connection" $ do
|
||||
describe "Establishing duplex connection (via agent protocol)" $ do
|
||||
it "should connect via one server and one agent" $ do
|
||||
smpAgentTest2_1_1 $ testDuplexConnection t
|
||||
it "should connect via one server and one agent (random IDs)" $ do
|
||||
|
||||
@@ -177,6 +177,8 @@ functionalAPITests t = do
|
||||
testMatrix2 t runAgentClientTest
|
||||
it "should connect when server with multiple identities is stored" $
|
||||
withSmpServer t testServerMultipleIdentities
|
||||
it "should connect with two peers" $
|
||||
withSmpServer t testAgentClient3
|
||||
describe "Establishing duplex connection v2, different Ratchet versions" $
|
||||
testRatchetMatrix2 t runAgentClientTest
|
||||
describe "Establish duplex connection via contact address" $
|
||||
@@ -417,6 +419,32 @@ runAgentClientTest alice bob baseId = do
|
||||
where
|
||||
msgId = subtract baseId
|
||||
|
||||
testAgentClient3 :: HasCallStack => IO ()
|
||||
testAgentClient3 = do
|
||||
a <- getSMPAgentClient' agentCfg initAgentServers testDB
|
||||
b <- getSMPAgentClient' agentCfg initAgentServers testDB2
|
||||
c <- getSMPAgentClient' agentCfg initAgentServers testDB3
|
||||
runRight_ $ do
|
||||
(aIdForB, bId) <- makeConnection a b
|
||||
(aIdForC, cId) <- makeConnection a c
|
||||
|
||||
4 <- sendMessage a bId SMP.noMsgFlags "b4"
|
||||
4 <- sendMessage a cId SMP.noMsgFlags "c4"
|
||||
5 <- sendMessage a bId SMP.noMsgFlags "b5"
|
||||
5 <- sendMessage a cId SMP.noMsgFlags "c5"
|
||||
get a =##> \case ("", connId, SENT 4) -> connId == bId || connId == cId; _ -> False
|
||||
get a =##> \case ("", connId, SENT 4) -> connId == bId || connId == cId; _ -> False
|
||||
get a =##> \case ("", connId, SENT 5) -> connId == bId || connId == cId; _ -> False
|
||||
get a =##> \case ("", connId, SENT 5) -> connId == bId || connId == cId; _ -> False
|
||||
get b =##> \case ("", connId, Msg "b4") -> connId == aIdForB; _ -> False
|
||||
ackMessage b aIdForB 4 Nothing
|
||||
get b =##> \case ("", connId, Msg "b5") -> connId == aIdForB; _ -> False
|
||||
ackMessage b aIdForB 5 Nothing
|
||||
get c =##> \case ("", connId, Msg "c4") -> connId == aIdForC; _ -> False
|
||||
ackMessage c aIdForC 4 Nothing
|
||||
get c =##> \case ("", connId, Msg "c5") -> connId == aIdForC; _ -> False
|
||||
ackMessage c aIdForC 5 Nothing
|
||||
|
||||
runAgentClientContactTest :: HasCallStack => AgentClient -> AgentClient -> AgentMsgId -> IO ()
|
||||
runAgentClientContactTest alice bob baseId = do
|
||||
runRight_ $ do
|
||||
|
||||
+232
-11
@@ -22,9 +22,14 @@ import qualified Data.Text as T
|
||||
import Data.Text.Encoding (encodeUtf8)
|
||||
import Data.Time
|
||||
import Data.Word (Word32)
|
||||
import Database.SQLite.Simple (Only (..))
|
||||
import qualified Database.SQLite.Simple as SQL
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
import SMPClient (testKeyHash)
|
||||
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
|
||||
import Simplex.FileTransfer.Description
|
||||
import Simplex.FileTransfer.Protocol
|
||||
import Simplex.FileTransfer.Types
|
||||
import Simplex.Messaging.Agent.Client ()
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.Store
|
||||
@@ -32,6 +37,9 @@ import Simplex.Messaging.Agent.Store.SQLite
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.File (CryptoFile (..))
|
||||
import Simplex.Messaging.Encoding.String (StrEncoding (..))
|
||||
import Simplex.Messaging.Protocol (SubscriptionMode (..))
|
||||
import qualified Simplex.Messaging.Protocol as SMP
|
||||
import System.Random
|
||||
import Test.Hspec
|
||||
@@ -109,6 +117,16 @@ storeTests = do
|
||||
testCreateRcvMsg
|
||||
testCreateSndMsg
|
||||
testCreateRcvAndSndMsgs
|
||||
describe "Work items" $ do
|
||||
it "should getPendingQueueMsg" testGetPendingQueueMsg
|
||||
it "should getPendingServerCommand" testGetPendingServerCommand
|
||||
it "should getNextRcvChunkToDownload" testGetNextRcvChunkToDownload
|
||||
it "should getNextRcvFileToDecrypt" testGetNextRcvFileToDecrypt
|
||||
it "should getNextSndFileToPrepare" testGetNextSndFileToPrepare
|
||||
it "should getNextSndChunkToUpload" testGetNextSndChunkToUpload
|
||||
it "should getNextDeletedSndChunkReplica" testGetNextDeletedSndChunkReplica
|
||||
it "should markNtfSubActionNtfFailed_" testMarkNtfSubActionNtfFailed
|
||||
it "should markNtfSubActionSMPFailed_" testMarkNtfSubActionSMPFailed
|
||||
describe "open/close store" $ do
|
||||
it "should close and re-open" testCloseReopenStore
|
||||
it "should close and re-open encrypted store" testCloseReopenEncryptedStore
|
||||
@@ -163,12 +181,15 @@ testPrivDhKey = "MC4CAQAwBQYDK2VuBCIEINCzbVFaCiYHoYncxNY8tSIfn0pXcIAhLBfFc0m+gOp
|
||||
testDhSecret :: C.DhSecretX25519
|
||||
testDhSecret = "01234567890123456789012345678901"
|
||||
|
||||
smpServer1 :: SMPServer
|
||||
smpServer1 = SMPServer "smp.simplex.im" "5223" testKeyHash
|
||||
|
||||
rcvQueue1 :: NewRcvQueue
|
||||
rcvQueue1 =
|
||||
RcvQueue
|
||||
{ userId = 1,
|
||||
connId = "conn1",
|
||||
server = SMPServer "smp.simplex.im" "5223" testKeyHash,
|
||||
server = smpServer1,
|
||||
rcvId = "1234",
|
||||
rcvPrivateKey = testPrivateSignKey,
|
||||
rcvDhSecret = testDhSecret,
|
||||
@@ -190,7 +211,7 @@ sndQueue1 =
|
||||
SndQueue
|
||||
{ userId = 1,
|
||||
connId = "conn1",
|
||||
server = SMPServer "smp.simplex.im" "5223" testKeyHash,
|
||||
server = smpServer1,
|
||||
sndId = "3456",
|
||||
sndPublicKey = Nothing,
|
||||
sndPrivateKey = testPrivateSignKey,
|
||||
@@ -478,23 +499,25 @@ mkSndMsgData internalId internalSndId internalHash =
|
||||
prevMsgHash = internalHash
|
||||
}
|
||||
|
||||
testCreateSndMsg_ :: DB.Connection -> PrevSndMsgHash -> ConnId -> SndMsgData -> Expectation
|
||||
testCreateSndMsg_ db expectedPrevHash connId sndMsgData@SndMsgData {..} = do
|
||||
testCreateSndMsg_ :: DB.Connection -> PrevSndMsgHash -> ConnId -> SndQueue -> SndMsgData -> Expectation
|
||||
testCreateSndMsg_ db expectedPrevHash connId sq sndMsgData@SndMsgData {..} = do
|
||||
updateSndIds db connId
|
||||
`shouldReturn` (internalId, internalSndId, expectedPrevHash)
|
||||
createSndMsg db connId sndMsgData
|
||||
`shouldReturn` ()
|
||||
createSndMsgDelivery db connId sq internalId
|
||||
`shouldReturn` ()
|
||||
|
||||
testCreateSndMsg :: SpecWith SQLiteStore
|
||||
testCreateSndMsg =
|
||||
it "should create a SndMsg and return InternalId and PrevSndMsgHash" $ \st -> do
|
||||
g <- C.newRandom
|
||||
let ConnData {connId} = cData1
|
||||
_ <- withTransaction st $ \db -> do
|
||||
Right (_, sq) <- withTransaction st $ \db -> do
|
||||
createSndConn db g cData1 sndQueue1
|
||||
withTransaction st $ \db -> do
|
||||
testCreateSndMsg_ db "" connId $ mkSndMsgData (InternalId 1) (InternalSndId 1) "hash_dummy"
|
||||
testCreateSndMsg_ db "hash_dummy" connId $ mkSndMsgData (InternalId 2) (InternalSndId 2) "new_hash_dummy"
|
||||
testCreateSndMsg_ db "" connId sq $ mkSndMsgData (InternalId 1) (InternalSndId 1) "hash_dummy"
|
||||
testCreateSndMsg_ db "hash_dummy" connId sq $ mkSndMsgData (InternalId 2) (InternalSndId 2) "new_hash_dummy"
|
||||
|
||||
testCreateRcvAndSndMsgs :: SpecWith SQLiteStore
|
||||
testCreateRcvAndSndMsgs =
|
||||
@@ -504,13 +527,13 @@ testCreateRcvAndSndMsgs =
|
||||
g <- C.newRandom
|
||||
createRcvConn db g cData1 rcvQueue1 SCMInvitation
|
||||
withTransaction st $ \db -> do
|
||||
_ <- upgradeRcvConnToDuplex db "conn1" sndQueue1
|
||||
Right sq <- upgradeRcvConnToDuplex db "conn1" sndQueue1
|
||||
testCreateRcvMsg_ db 0 "" connId rq $ mkRcvMsgData (InternalId 1) (InternalRcvId 1) 1 "1" "rcv_hash_1"
|
||||
testCreateRcvMsg_ db 1 "rcv_hash_1" connId rq $ mkRcvMsgData (InternalId 2) (InternalRcvId 2) 2 "2" "rcv_hash_2"
|
||||
testCreateSndMsg_ db "" connId $ mkSndMsgData (InternalId 3) (InternalSndId 1) "snd_hash_1"
|
||||
testCreateSndMsg_ db "" connId sq $ mkSndMsgData (InternalId 3) (InternalSndId 1) "snd_hash_1"
|
||||
testCreateRcvMsg_ db 2 "rcv_hash_2" connId rq $ mkRcvMsgData (InternalId 4) (InternalRcvId 3) 3 "3" "rcv_hash_3"
|
||||
testCreateSndMsg_ db "snd_hash_1" connId $ mkSndMsgData (InternalId 5) (InternalSndId 2) "snd_hash_2"
|
||||
testCreateSndMsg_ db "snd_hash_2" connId $ mkSndMsgData (InternalId 6) (InternalSndId 3) "snd_hash_3"
|
||||
testCreateSndMsg_ db "snd_hash_1" connId sq $ mkSndMsgData (InternalId 5) (InternalSndId 2) "snd_hash_2"
|
||||
testCreateSndMsg_ db "snd_hash_2" connId sq $ mkSndMsgData (InternalId 6) (InternalSndId 3) "snd_hash_3"
|
||||
|
||||
testCloseReopenStore :: IO ()
|
||||
testCloseReopenStore = do
|
||||
@@ -562,3 +585,201 @@ hasMigrations st = getMigrations st `shouldReturn` True
|
||||
|
||||
errorGettingMigrations :: SQLiteStore -> Expectation
|
||||
errorGettingMigrations st = getMigrations st `shouldThrow` \(e :: SomeException) -> "ErrorMisuse" `isInfixOf` show e
|
||||
|
||||
testGetPendingQueueMsg :: SQLiteStore -> Expectation
|
||||
testGetPendingQueueMsg st = do
|
||||
g <- C.newRandom
|
||||
withTransaction st $ \db -> do
|
||||
Right (connId, sq) <- createSndConn db g cData1 {connId = ""} sndQueue1
|
||||
Right Nothing <- getPendingQueueMsg db connId sq
|
||||
testCreateSndMsg_ db "" connId sq $ mkSndMsgData (InternalId 1) (InternalSndId 1) "hash_dummy"
|
||||
DB.execute db "UPDATE messages SET msg_type = cast('bad' as blob) WHERE conn_id = ? AND internal_id = ?" (connId, 1 :: Int)
|
||||
testCreateSndMsg_ db "hash_dummy" connId sq $ mkSndMsgData (InternalId 2) (InternalSndId 2) "new_hash_dummy"
|
||||
|
||||
Left e <- getPendingQueueMsg db connId sq
|
||||
show e `shouldContain` "bad AgentMessageType"
|
||||
DB.query_ db "SELECT conn_id, internal_id FROM snd_message_deliveries WHERE failed = 1" `shouldReturn` [(connId, 1 :: Int)]
|
||||
|
||||
Right (Just (Nothing, PendingMsgData {msgId})) <- getPendingQueueMsg db connId sq
|
||||
msgId `shouldBe` InternalId 2
|
||||
|
||||
testGetPendingServerCommand :: SQLiteStore -> Expectation
|
||||
testGetPendingServerCommand st = do
|
||||
g <- C.newRandom
|
||||
withTransaction st $ \db -> do
|
||||
Right Nothing <- getPendingServerCommand db Nothing
|
||||
Right connId <- createNewConn db g cData1 {connId = ""} SCMInvitation
|
||||
Right () <- createCommand db "1" connId Nothing command
|
||||
corruptCmd db "1" connId
|
||||
Right () <- createCommand db "2" connId Nothing command
|
||||
|
||||
Left e <- getPendingServerCommand db Nothing
|
||||
show e `shouldContain` "bad AgentCmdType"
|
||||
DB.query_ db "SELECT conn_id, corr_id FROM commands WHERE failed = 1" `shouldReturn` [(connId, "1" :: ByteString)]
|
||||
|
||||
Right (Just PendingCommand {corrId}) <- getPendingServerCommand db Nothing
|
||||
corrId `shouldBe` "2"
|
||||
|
||||
Right _ <- updateNewConnRcv db connId rcvQueue1
|
||||
Right Nothing <- getPendingServerCommand db $ Just smpServer1
|
||||
Right () <- createCommand db "3" connId (Just smpServer1) command
|
||||
corruptCmd db "3" connId
|
||||
Right () <- createCommand db "4" connId (Just smpServer1) command
|
||||
|
||||
Left e' <- getPendingServerCommand db (Just smpServer1)
|
||||
show e' `shouldContain` "bad AgentCmdType"
|
||||
DB.query_ db "SELECT conn_id, corr_id FROM commands WHERE failed = 1" `shouldReturn` [(connId, "1" :: ByteString), (connId, "3" :: ByteString)]
|
||||
|
||||
Right (Just PendingCommand {corrId = corrId'}) <- getPendingServerCommand db (Just smpServer1)
|
||||
corrId' `shouldBe` "4"
|
||||
where
|
||||
command = AClientCommand $ APC SAEConn $ NEW True (ACM SCMInvitation) SMSubscribe
|
||||
corruptCmd :: DB.Connection -> ByteString -> ConnId -> IO ()
|
||||
corruptCmd db corrId connId = DB.execute db "UPDATE commands SET command = cast('bad' as blob) WHERE conn_id = ? AND corr_id = ?" (connId, corrId)
|
||||
|
||||
xftpServer1 :: SMP.XFTPServer
|
||||
xftpServer1 = SMP.ProtocolServer SMP.SPXFTP "xftp.simplex.im" "5223" testKeyHash
|
||||
|
||||
rcvFileDescr1 :: FileDescription 'FRecipient
|
||||
rcvFileDescr1 =
|
||||
FileDescription
|
||||
{ party = SFRecipient,
|
||||
size = FileSize $ mb 26,
|
||||
digest = FileDigest "abc",
|
||||
key = testFileSbKey,
|
||||
nonce = testFileCbNonce,
|
||||
chunkSize = defaultChunkSize,
|
||||
chunks =
|
||||
[ FileChunk
|
||||
{ chunkNo = 1,
|
||||
digest = chunkDigest,
|
||||
chunkSize = defaultChunkSize,
|
||||
replicas = [FileChunkReplica {server = xftpServer1, replicaId, replicaKey = testFileReplicaKey}]
|
||||
}
|
||||
]
|
||||
}
|
||||
where
|
||||
defaultChunkSize = FileSize $ mb 8
|
||||
replicaId = ChunkReplicaId "abc"
|
||||
chunkDigest = FileDigest "ghi"
|
||||
|
||||
testFileSbKey :: C.SbKey
|
||||
testFileSbKey = either error id $ strDecode "00n8p1tJq5E-SGnHcYTOrS4A9I07gTA_WFD6MTFFFOY="
|
||||
|
||||
testFileCbNonce :: C.CbNonce
|
||||
testFileCbNonce = either error id $ strDecode "dPSF-wrQpDiK_K6sYv0BDBZ9S4dg-jmu"
|
||||
|
||||
testFileReplicaKey :: C.APrivateSignKey
|
||||
testFileReplicaKey = C.APrivateSignKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe"
|
||||
|
||||
testGetNextRcvChunkToDownload :: SQLiteStore -> Expectation
|
||||
testGetNextRcvChunkToDownload st = do
|
||||
g <- C.newRandom
|
||||
withTransaction st $ \db -> do
|
||||
Right Nothing <- getNextRcvChunkToDownload db xftpServer1 86400
|
||||
|
||||
Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing)
|
||||
DB.execute_ db "UPDATE rcv_file_chunk_replicas SET replica_key = cast('bad' as blob) WHERE rcv_file_chunk_replica_id = 1"
|
||||
Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing)
|
||||
|
||||
Left e <- getNextRcvChunkToDownload db xftpServer1 86400
|
||||
show e `shouldContain` "ConversionFailed"
|
||||
DB.query_ db "SELECT rcv_file_id FROM rcv_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)]
|
||||
|
||||
Right (Just RcvFileChunk {rcvFileEntityId}) <- getNextRcvChunkToDownload db xftpServer1 86400
|
||||
rcvFileEntityId `shouldBe` fId2
|
||||
|
||||
testGetNextRcvFileToDecrypt :: SQLiteStore -> Expectation
|
||||
testGetNextRcvFileToDecrypt st = do
|
||||
g <- C.newRandom
|
||||
withTransaction st $ \db -> do
|
||||
Right Nothing <- getNextRcvFileToDecrypt db 86400
|
||||
|
||||
Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing)
|
||||
DB.execute_ db "UPDATE rcv_files SET status = 'received' WHERE rcv_file_id = 1"
|
||||
DB.execute_ db "UPDATE rcv_file_chunk_replicas SET replica_key = cast('bad' as blob) WHERE rcv_file_chunk_replica_id = 1"
|
||||
Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing)
|
||||
DB.execute_ db "UPDATE rcv_files SET status = 'received' WHERE rcv_file_id = 2"
|
||||
|
||||
Left e <- getNextRcvFileToDecrypt db 86400
|
||||
show e `shouldContain` "ConversionFailed"
|
||||
DB.query_ db "SELECT rcv_file_id FROM rcv_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)]
|
||||
|
||||
Right (Just RcvFile {rcvFileEntityId}) <- getNextRcvFileToDecrypt db 86400
|
||||
rcvFileEntityId `shouldBe` fId2
|
||||
|
||||
testGetNextSndFileToPrepare :: SQLiteStore -> Expectation
|
||||
testGetNextSndFileToPrepare st = do
|
||||
g <- C.newRandom
|
||||
withTransaction st $ \db -> do
|
||||
Right Nothing <- getNextSndFileToPrepare db 86400
|
||||
|
||||
Right _ <- createSndFile db g 1 (CryptoFile "filepath" Nothing) 1 "filepath" testFileSbKey testFileCbNonce
|
||||
DB.execute_ db "UPDATE snd_files SET status = 'new', num_recipients = 'bad' WHERE snd_file_id = 1"
|
||||
Right fId2 <- createSndFile db g 1 (CryptoFile "filepath" Nothing) 1 "filepath" testFileSbKey testFileCbNonce
|
||||
DB.execute_ db "UPDATE snd_files SET status = 'new' WHERE snd_file_id = 2"
|
||||
|
||||
Left e <- getNextSndFileToPrepare db 86400
|
||||
show e `shouldContain` "ConversionFailed"
|
||||
DB.query_ db "SELECT snd_file_id FROM snd_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)]
|
||||
|
||||
Right (Just SndFile {sndFileEntityId}) <- getNextSndFileToPrepare db 86400
|
||||
sndFileEntityId `shouldBe` fId2
|
||||
|
||||
newSndChunkReplica1 :: NewSndChunkReplica
|
||||
newSndChunkReplica1 =
|
||||
NewSndChunkReplica
|
||||
{ server = xftpServer1,
|
||||
replicaId = ChunkReplicaId "abc",
|
||||
replicaKey = testFileReplicaKey,
|
||||
rcvIdsKeys = [(ChunkReplicaId "abc", testFileReplicaKey)]
|
||||
}
|
||||
|
||||
testGetNextSndChunkToUpload :: SQLiteStore -> Expectation
|
||||
testGetNextSndChunkToUpload st = do
|
||||
g <- C.newRandom
|
||||
withTransaction st $ \db -> do
|
||||
Right Nothing <- getNextSndChunkToUpload db xftpServer1 86400
|
||||
|
||||
-- create file 1
|
||||
Right _ <- createSndFile db g 1 (CryptoFile "filepath" Nothing) 1 "filepath" testFileSbKey testFileCbNonce
|
||||
updateSndFileEncrypted db 1 (FileDigest "abc") [(XFTPChunkSpec "filepath" 1 1, FileDigest "ghi")]
|
||||
createSndFileReplica_ db 1 newSndChunkReplica1
|
||||
DB.execute_ db "UPDATE snd_files SET num_recipients = 'bad' WHERE snd_file_id = 1"
|
||||
-- create file 2
|
||||
Right fId2 <- createSndFile db g 1 (CryptoFile "filepath" Nothing) 1 "filepath" testFileSbKey testFileCbNonce
|
||||
updateSndFileEncrypted db 2 (FileDigest "abc") [(XFTPChunkSpec "filepath" 1 1, FileDigest "ghi")]
|
||||
createSndFileReplica_ db 2 newSndChunkReplica1
|
||||
|
||||
Left e <- getNextSndChunkToUpload db xftpServer1 86400
|
||||
show e `shouldContain` "ConversionFailed"
|
||||
DB.query_ db "SELECT snd_file_id FROM snd_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)]
|
||||
|
||||
Right (Just SndFileChunk {sndFileEntityId}) <- getNextSndChunkToUpload db xftpServer1 86400
|
||||
sndFileEntityId `shouldBe` fId2
|
||||
|
||||
testGetNextDeletedSndChunkReplica :: SQLiteStore -> Expectation
|
||||
testGetNextDeletedSndChunkReplica st = do
|
||||
withTransaction st $ \db -> do
|
||||
Right Nothing <- getNextDeletedSndChunkReplica db xftpServer1 86400
|
||||
|
||||
createDeletedSndChunkReplica db 1 (FileChunkReplica xftpServer1 (ChunkReplicaId "abc") testFileReplicaKey) (FileDigest "ghi")
|
||||
DB.execute_ db "UPDATE deleted_snd_chunk_replicas SET retries = 'bad' WHERE deleted_snd_chunk_replica_id = 1"
|
||||
createDeletedSndChunkReplica db 1 (FileChunkReplica xftpServer1 (ChunkReplicaId "abc") testFileReplicaKey) (FileDigest "ghi")
|
||||
|
||||
Left e <- getNextDeletedSndChunkReplica db xftpServer1 86400
|
||||
show e `shouldContain` "ConversionFailed"
|
||||
DB.query_ db "SELECT deleted_snd_chunk_replica_id FROM deleted_snd_chunk_replicas WHERE failed = 1" `shouldReturn` [Only (1 :: Int)]
|
||||
|
||||
Right (Just DeletedSndChunkReplica {deletedSndChunkReplicaId}) <- getNextDeletedSndChunkReplica db xftpServer1 86400
|
||||
deletedSndChunkReplicaId `shouldBe` 2
|
||||
|
||||
testMarkNtfSubActionNtfFailed :: SQLiteStore -> Expectation
|
||||
testMarkNtfSubActionNtfFailed st = do
|
||||
withTransaction st $ \db -> do
|
||||
markNtfSubActionNtfFailed_ db "abc"
|
||||
|
||||
testMarkNtfSubActionSMPFailed :: SQLiteStore -> Expectation
|
||||
testMarkNtfSubActionSMPFailed st = do
|
||||
withTransaction st $ \db -> do
|
||||
markNtfSubActionSMPFailed_ db "abc"
|
||||
|
||||
Reference in New Issue
Block a user