diff --git a/simplexmq.cabal b/simplexmq.cabal index 56e4fd2ec..f2929f8d1 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -99,6 +99,7 @@ library Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230814_indexes Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at + Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items Simplex.Messaging.Agent.TAsyncs Simplex.Messaging.Agent.TRcvQueues Simplex.Messaging.Client diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index b592155ea..37416bdfb 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -908,10 +908,10 @@ sendMessagesB c reqs = withConnLocks c connIds "sendMessages" $ do enqueueCommand :: AgentMonad m => AgentClient -> ACorrId -> ConnId -> Maybe SMPServer -> AgentCommand -> m () enqueueCommand c corrId connId server aCommand = do withStore c $ \db -> createCommand db corrId connId server aCommand - resumeSrvCmds c server + void $ getAsyncCmdWorker (\w -> hasWorkToDo w $> w) c server resumeSrvCmds :: forall m. AgentMonad' m => AgentClient -> Maybe SMPServer -> m () -resumeSrvCmds = void .: getAsyncCmdWorker +resumeSrvCmds = void .: getAsyncCmdWorker pure resumeConnCmds :: forall m. AgentMonad m => AgentClient -> ConnId -> m () resumeConnCmds c connId = @@ -921,12 +921,11 @@ resumeConnCmds c connId = where connQueued = atomically $ isJust <$> TM.lookupInsert connId True (connCmdsQueued c) -getAsyncCmdWorker :: AgentMonad' m => AgentClient -> Maybe SMPServer -> m Worker -getAsyncCmdWorker c server = - atomically (getWorker >>= maybe createWorker resumeWorker) >>= \w -> runWorker w $> w +getAsyncCmdWorker :: AgentMonad' m => (Worker -> STM Worker) -> AgentClient -> Maybe SMPServer -> m Worker +getAsyncCmdWorker whenExists c server = + atomically (getWorker >>= maybe createWorker whenExists) >>= \w -> runWorker w $> w where getWorker = TM.lookup server $ asyncCmdWorkers c - resumeWorker w = hasWorkToDo w $> w deleteWorker wId = mapM_ $ \w -> when (wId == workerId w) $ TM.delete server $ asyncCmdWorkers c createWorker = do w <- newWorker c @@ -945,7 +944,7 @@ runCommandProcessing c@AgentClient {subQ} server_ doWork = do waitForWork doWork atomically $ throwWhenInactive c atomically $ beginAgentOperation c AOSndNetwork - withWork c doWork (\db -> getPendingServerCommand db server_) $ processCmd (riFast ri) + withWork c doWork (`getPendingServerCommand` server_) $ processCmd (riFast ri) where processCmd :: RetryInterval -> PendingCommand -> m () processCmd ri PendingCommand {cmdId, corrId, userId, connId, command} = case command of @@ -1127,15 +1126,14 @@ enqueueSavedMessageB c reqs = do in map (\sq -> createSndMsgDelivery db connId sq mId) sqs resumeMsgDelivery :: forall m. AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m () -resumeMsgDelivery = void .:. getDeliveryWorker +resumeMsgDelivery = void .:. getDeliveryWorker pure -getDeliveryWorker :: AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m (Worker, TMVar ()) -getDeliveryWorker c cData sq = do - atomically (getWorker >>= maybe createWorker resumeWorker) >>= \wl -> runWorker wl $> wl +getDeliveryWorker :: AgentMonad' m => ((Worker, TMVar ()) -> STM (Worker, TMVar ())) -> AgentClient -> ConnData -> SndQueue -> m (Worker, TMVar ()) +getDeliveryWorker whenExists c cData sq = do + atomically (getWorker >>= maybe createWorker whenExists) >>= \wl -> runWorker wl $> wl where qAddr = qAddress sq getWorker = TM.lookup qAddr $ smpDeliveryWorkers c - resumeWorker wl = hasWorkToDo (fst wl) $> wl deleteWorker wId = mapM_ $ \(w, _) -> when (wId == workerId w) $ TM.delete qAddr $ smpDeliveryWorkers c createWorker = do retryLock <- newEmptyTMVar @@ -1150,7 +1148,7 @@ getDeliveryWorker c cData sq = do submitPendingMsg :: AgentMonad' m => AgentClient -> ConnData -> SndQueue -> m () submitPendingMsg c cData sq = do atomically $ modifyTVar' (msgDeliveryOp c) $ \s -> s {opsInProgress = opsInProgress s + 1} - resumeMsgDelivery c cData sq + void $ getDeliveryWorker (\wl -> hasWorkToDo (fst wl) $> wl) c cData sq runSmpQueueMsgDelivery :: forall m. AgentMonad m => AgentClient -> ConnData -> SndQueue -> TMVar () -> TMVar () -> m () runSmpQueueMsgDelivery c@AgentClient {subQ} cData@ConnData {userId, connId, duplexHandshake} sq doWork qLock = do @@ -2189,7 +2187,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s case findQ addr sqs of Just sq -> do logServer "<--" c srv rId "MSG " - atomically $ + atomically $ TM.lookup (qAddress sq) (smpDeliveryWorkers c) >>= mapM_ (\(_, retryLock) -> tryPutTMVar retryLock ()) Nothing -> qError "QCONT: queue address not found" diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 3038f57a8..0203de41e 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -139,10 +139,9 @@ import Data.Functor (($>)) import Data.List (deleteFirstsBy, foldl', partition, (\\)) import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L -import Data.Maybe (isNothing) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (listToMaybe) +import Data.Maybe (isNothing, listToMaybe) import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) @@ -1241,15 +1240,18 @@ cryptoError = \case waitForWork :: AgentMonad' m => TMVar () -> m () waitForWork = void . atomically . readTMVar -withWork :: AgentMonad m => AgentClient -> TMVar () -> (DB.Connection -> IO (Maybe a)) -> (a -> m ()) -> m () -withWork c doWork getWork action = do - r <- withStore' c $ \db -> do - r' <- getWork db - liftIO $ when (isNothing r') $ noWorkToDo doWork - pure r' - mapM_ action r +withWork :: AgentMonad m => AgentClient -> TMVar () -> (DB.Connection -> IO (Either StoreError (Maybe a))) -> (a -> m ()) -> m () +withWork c doWork getWork action = + withStore' c getWork >>= \case + Right (Just r) -> action r + Right Nothing -> noWork + Left e@SEWorkItemError {} -> noWork >> notifyErr e + Left e -> notifyErr e + where + noWork = liftIO $ noWorkToDo doWork + notifyErr e = atomically $ writeTBQueue (subQ c) ("", "", APC SAEConn $ ERR $ INTERNAL $ show e) -noWorkToDo :: TMVar () -> IO () +noWorkToDo :: TMVar () -> IO () noWorkToDo = void . atomically . tryTakeTMVar hasWorkToDo :: Worker -> STM () @@ -1356,14 +1358,15 @@ withStoreCtx_ ctx_ c action = do withStoreBatch :: (AgentMonad' m, Traversable t) => AgentClient -> (DB.Connection -> t (IO (Either AgentErrorType a))) -> m (t (Either AgentErrorType a)) withStoreBatch c actions = do st <- asks store - liftIO $ agentOperationBracket c AODatabase (\_ -> pure ()) $ - withTransaction st $ mapM (`E.catch` handleInternal) . actions + liftIO . agentOperationBracket c AODatabase (\_ -> pure ()) $ + withTransaction st $ + mapM (`E.catch` handleInternal) . actions where handleInternal :: E.SomeException -> IO (Either AgentErrorType a) handleInternal = pure . Left . INTERNAL . show withStoreBatch' :: (AgentMonad' m, Traversable t) => AgentClient -> (DB.Connection -> t (IO a)) -> m (t (Either AgentErrorType a)) -withStoreBatch' c actions = withStoreBatch c $ \db -> fmap Right <$> actions db +withStoreBatch' c actions = withStoreBatch c (fmap (fmap Right) . actions) storeError :: StoreError -> AgentErrorType storeError = \case diff --git a/src/Simplex/Messaging/Agent/Protocol.hs b/src/Simplex/Messaging/Agent/Protocol.hs index c79df9a70..49fa40643 100644 --- a/src/Simplex/Messaging/Agent/Protocol.hs +++ b/src/Simplex/Messaging/Agent/Protocol.hs @@ -1703,8 +1703,8 @@ commandP binaryP = >>= \case ACmdTag SClient e cmd -> ACmd SClient e <$> case cmd of - NEW_ -> s (NEW <$> strP_ <*> strP_ <*> strP) - JOIN_ -> s (JOIN <$> strP_ <*> strP_ <*> strP_ <*> binaryP) + NEW_ -> s (NEW <$> strP_ <*> strP_ <*> (strP <|> pure SMP.SMSubscribe)) + JOIN_ -> s (JOIN <$> strP_ <*> strP_ <*> (strP_ <|> pure SMP.SMSubscribe) <*> binaryP) LET_ -> s (LET <$> A.takeTill (== ' ') <* A.space <*> binaryP) ACPT_ -> s (ACPT <$> A.takeTill (== ' ') <* A.space <*> binaryP) RJCT_ -> s (RJCT <$> A.takeByteString) diff --git a/src/Simplex/Messaging/Agent/Store.hs b/src/Simplex/Messaging/Agent/Store.hs index c96654d5b..028b7d8ea 100644 --- a/src/Simplex/Messaging/Agent/Store.hs +++ b/src/Simplex/Messaging/Agent/Store.hs @@ -645,4 +645,6 @@ data StoreError SEFileNotFound | -- | XFTP Deleted snd chunk replica not found. SEDeletedSndChunkReplicaNotFound + | -- | Error when reading work item that suspends worker - do not use! + SEWorkItemError ByteString deriving (Eq, Show, Exception) diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index 62dcd0fb0..559778355 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -152,7 +152,9 @@ module Simplex.Messaging.Agent.Store.SQLite setNullNtfSubscriptionAction, deleteNtfSubscription, getNextNtfSubNTFAction, + markNtfSubActionNtfFailed_, -- exported for tests getNextNtfSubSMPAction, + markNtfSubActionSMPFailed_, -- exported for tests getActiveNtfToken, getNtfRcvQueue, setConnectionNtfs, @@ -191,6 +193,7 @@ module Simplex.Messaging.Agent.Store.SQLite deleteSndFile', getSndFileDeleted, createSndFileReplica, + createSndFileReplica_, -- exported for tests getNextSndChunkToUpload, updateSndChunkReplicaDelay, addSndChunkReplicaRecipients, @@ -222,7 +225,7 @@ import Control.Monad.IO.Class import Crypto.Random (ChaChaDRG) import qualified Data.Aeson.TH as J import qualified Data.Attoparsec.ByteString.Char8 as A -import Data.Bifunctor (second) +import Data.Bifunctor (first, second) import Data.ByteArray (ScrubbedBytes) import qualified Data.ByteArray as BA import Data.ByteString (ByteString) @@ -271,7 +274,7 @@ import Simplex.Messaging.Parsers (blobFieldParser, defaultJSON, dropPrefix, from import Simplex.Messaging.Protocol import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Transport.Client (TransportHost) -import Simplex.Messaging.Util (bshow, eitherToMaybe, ifM, safeDecodeUtf8, ($>>=), (<$$>)) +import Simplex.Messaging.Util (bshow, catchAllErrors, eitherToMaybe, ifM, safeDecodeUtf8, ($>>=), (<$$>)) import Simplex.Messaging.Version import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist) import System.Exit (exitFailure) @@ -975,29 +978,60 @@ updateSndMsgRcpt db connId sndMsgId MsgReceipt {agentMsgId, msgRcptStatus} = "UPDATE snd_messages SET rcpt_internal_id = ?, rcpt_status = ? WHERE conn_id = ? AND internal_snd_id = ?" (agentMsgId, msgRcptStatus, connId, sndMsgId) -getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Maybe (Maybe RcvQueue, PendingMsgData)) -getPendingQueueMsg db connId SndQueue {dbQueueId} = do - rq_ <- L.head <$$> getRcvQueuesByConnId_ db connId - (rq_,) <$$> maybeFirstRow pendingMsgData getMsgData_ +getPendingQueueMsg :: DB.Connection -> ConnId -> SndQueue -> IO (Either StoreError (Maybe (Maybe RcvQueue, PendingMsgData))) +getPendingQueueMsg db connId SndQueue {dbQueueId} = + getWorkItem "message" getMsgId getMsgData markMsgFailed where - getMsgData_ = - DB.query - db - [sql| - SELECT m.internal_id, m.msg_type, m.msg_flags, m.msg_body, m.internal_ts, s.retry_int_slow, s.retry_int_fast - FROM snd_message_deliveries d - JOIN messages m ON m.conn_id = d.conn_id AND m.internal_id = d.internal_id - JOIN snd_messages s ON s.conn_id = d.conn_id AND s.internal_id = d.internal_id - WHERE d.conn_id = ? AND d.snd_queue_id = ? - ORDER BY d.internal_id ASC - LIMIT 1 - |] - (connId, dbQueueId) - pendingMsgData :: (InternalId, AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs, Maybe Int64, Maybe Int64) -> PendingMsgData - pendingMsgData (msgId, msgType, msgFlags_, msgBody, internalTs, riSlow_, riFast_) = - let msgFlags = fromMaybe SMP.noMsgFlags msgFlags_ - msgRetryState = RI2State <$> riSlow_ <*> riFast_ - in PendingMsgData {msgId, msgType, msgFlags, msgBody, msgRetryState, internalTs} + getMsgId :: IO (Maybe InternalId) + getMsgId = + maybeFirstRow fromOnly $ + DB.query + db + [sql| + SELECT internal_id + FROM snd_message_deliveries d + WHERE conn_id = ? AND snd_queue_id = ? AND failed = 0 + ORDER BY internal_id ASC + LIMIT 1 + |] + (connId, dbQueueId) + getMsgData :: InternalId -> IO (Either StoreError (Maybe RcvQueue, PendingMsgData)) + getMsgData msgId = runExceptT $ do + msg <- ExceptT $ firstRow pendingMsgData err getMsgData_ + rq_ <- liftIO $ L.head <$$> getRcvQueuesByConnId_ db connId + pure (rq_, msg) + where + getMsgData_ = + DB.query + db + [sql| + SELECT m.msg_type, m.msg_flags, m.msg_body, m.internal_ts, s.retry_int_slow, s.retry_int_fast + FROM messages m + JOIN snd_messages s ON s.conn_id = m.conn_id AND s.internal_id = m.internal_id + WHERE m.conn_id = ? AND m.internal_id = ? + |] + (connId, msgId) + err = SEInternal $ "msg delivery " <> bshow msgId <> " returned []" + pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs, Maybe Int64, Maybe Int64) -> PendingMsgData + pendingMsgData (msgType, msgFlags_, msgBody, internalTs, riSlow_, riFast_) = + let msgFlags = fromMaybe SMP.noMsgFlags msgFlags_ + msgRetryState = RI2State <$> riSlow_ <*> riFast_ + in PendingMsgData {msgId, msgType, msgFlags, msgBody, msgRetryState, internalTs} + markMsgFailed msgId = DB.execute db "UPDATE snd_message_deliveries SET failed = 1 WHERE conn_id = ? AND internal_id = ?" (connId, msgId) + +getWorkItem :: Show i => ByteString -> IO (Maybe i) -> (i -> IO (Either StoreError a)) -> (i -> IO ()) -> IO (Either StoreError (Maybe a)) +getWorkItem itemName getId getItem markFailed = + runExceptT $ handleErr "getId" getId >>= mapM tryGetItem + where + tryGetItem itemId = ExceptT (getItem itemId) `catchStoreErrors` \e -> mark itemId >> throwError e + mark itemId = handleErr ("markFailed ID " <> bshow itemId) $ markFailed itemId + catchStoreErrors = catchAllErrors (SEInternal . bshow) + -- Errors caught by this function will suspend worker as if there is no more work, + handleErr :: ByteString -> IO a -> ExceptT StoreError IO a + handleErr opName action = ExceptT $ first mkError <$> E.try action + where + mkError :: E.SomeException -> StoreError + mkError e = SEWorkItemError $ itemName <> " " <> opName <> " error: " <> bshow e updatePendingMsgRIState :: DB.Connection -> ConnId -> InternalId -> RI2State -> IO () updatePendingMsgRIState db connId msgId RI2State {slowInterval, fastInterval} = @@ -1095,7 +1129,7 @@ deleteSndMsgDelivery db connId SndQueue {dbQueueId} msgId keepForReceipt = do countPendingSndDeliveries_ :: DB.Connection -> ConnId -> InternalId -> IO Int countPendingSndDeliveries_ db connId msgId = do - (Only cnt : _) <- DB.query db "SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ?" (connId, msgId) + (Only cnt : _) <- DB.query db "SELECT count(*) FROM snd_message_deliveries WHERE conn_id = ? AND internal_id = ? AND failed = 0" (connId, msgId) pure cnt deleteRcvMsgHashesExpired :: DB.Connection -> NominalDiffTime -> IO () @@ -1204,11 +1238,12 @@ updateRatchet db connId rc skipped = do createCommand :: DB.Connection -> ACorrId -> ConnId -> Maybe SMPServer -> AgentCommand -> IO (Either StoreError ()) createCommand db corrId connId srv_ cmd = runExceptT $ do (host_, port_, serverKeyHash_) <- serverFields - liftIO $ do + createdAt <- liftIO getCurrentTime + liftIO $ DB.execute db - "INSERT INTO commands (host, port, corr_id, conn_id, command_tag, command, server_key_hash) VALUES (?,?,?,?,?,?,?)" - (host_, port_, corrId, connId, agentCommandTag cmd, cmd, serverKeyHash_) + "INSERT INTO commands (host, port, corr_id, conn_id, command_tag, command, server_key_hash, created_at) VALUES (?,?,?,?,?,?,?,?)" + (host_, port_, corrId, connId, agentCommandTag cmd, cmd, serverKeyHash_, createdAt) where serverFields :: ExceptT StoreError IO (Maybe (NonEmpty TransportHost), Maybe ServiceName, Maybe C.KeyHash) serverFields = case srv_ of @@ -1235,33 +1270,47 @@ getPendingCommandServers db connId = do where smpServer (host, port, keyHash) = SMPServer <$> host <*> port <*> keyHash -getPendingServerCommand :: DB.Connection -> Maybe SMPServer -> IO (Maybe PendingCommand) -getPendingServerCommand db srv_ = maybeFirstRow pendingCommand $ case srv_ of - Nothing -> - DB.query_ - db - [sql| - SELECT c.command_id, c.corr_id, cs.user_id, c.conn_id, c.command - FROM commands c - JOIN connections cs USING (conn_id) - WHERE c.host IS NULL AND c.port IS NULL - ORDER BY c.created_at ASC, c.command_id ASC - LIMIT 1 - |] - Just (SMPServer host port _) -> - DB.query - db - [sql| - SELECT c.command_id, c.corr_id, cs.user_id, c.conn_id, c.command - FROM commands c - JOIN connections cs USING (conn_id) - WHERE c.host = ? AND c.port = ? - ORDER BY c.created_at ASC, c.command_id ASC - LIMIT 1 - |] - (host, port) +getPendingServerCommand :: DB.Connection -> Maybe SMPServer -> IO (Either StoreError (Maybe PendingCommand)) +getPendingServerCommand db srv_ = getWorkItem "command" getCmdId getCommand markCommandFailed where - pendingCommand (cmdId, corrId, userId, connId, command) = PendingCommand {cmdId, corrId, userId, connId, command} + getCmdId :: IO (Maybe Int64) + getCmdId = + maybeFirstRow fromOnly $ case srv_ of + Nothing -> + DB.query_ + db + [sql| + SELECT command_id FROM commands + WHERE host IS NULL AND port IS NULL AND failed = 0 + ORDER BY created_at ASC, command_id ASC + LIMIT 1 + |] + Just (SMPServer host port _) -> + DB.query + db + [sql| + SELECT command_id FROM commands + WHERE host = ? AND port = ? AND failed = 0 + ORDER BY created_at ASC, command_id ASC + LIMIT 1 + |] + (host, port) + getCommand :: Int64 -> IO (Either StoreError PendingCommand) + getCommand cmdId = + firstRow pendingCommand err $ + DB.query + db + [sql| + SELECT c.corr_id, cs.user_id, c.conn_id, c.command + FROM commands c + JOIN connections cs USING (conn_id) + WHERE c.command_id = ? + |] + (Only cmdId) + where + err = SEInternal $ "command " <> bshow cmdId <> " returned []" + pendingCommand (corrId, userId, connId, command) = PendingCommand {cmdId, corrId, userId, connId, command} + markCommandFailed cmdId = DB.execute db "UPDATE commands SET failed = 1 WHERE command_id = ?" (Only cmdId) deleteCommand :: DB.Connection -> AsyncCmdId -> IO () deleteCommand db cmdId = @@ -1486,53 +1535,95 @@ deleteNtfSubscription db connId = do (Nothing :: Maybe SMP.NotifierId, Nothing :: Maybe NtfSubscriptionId, NASDeleted, False, updatedAt, connId) else DB.execute db "DELETE FROM ntf_subscriptions WHERE conn_id = ?" (Only connId) -getNextNtfSubNTFAction :: DB.Connection -> NtfServer -> IO (Maybe (NtfSubscription, NtfSubNTFAction, NtfActionTs)) -getNextNtfSubNTFAction db ntfServer@(NtfServer ntfHost ntfPort _) = do - maybeFirstRow ntfSubAction getNtfSubAction_ $>>= \a@(NtfSubscription {connId}, _, _) -> do - DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId) - pure $ Just a +getNextNtfSubNTFAction :: DB.Connection -> NtfServer -> IO (Either StoreError (Maybe (NtfSubscription, NtfSubNTFAction, NtfActionTs))) +getNextNtfSubNTFAction db ntfServer@(NtfServer ntfHost ntfPort _) = + getWorkItem "ntf NTF" getNtfConnId getNtfSubAction (markNtfSubActionNtfFailed_ db) where - getNtfSubAction_ = - DB.query - db - [sql| - SELECT ns.conn_id, s.host, s.port, COALESCE(ns.smp_server_key_hash, s.key_hash), - ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_action - FROM ntf_subscriptions ns - JOIN servers s ON s.host = ns.smp_host AND s.port = ns.smp_port - WHERE ns.ntf_host = ? AND ns.ntf_port = ? AND ns.ntf_sub_action IS NOT NULL - ORDER BY ns.ntf_sub_action_ts ASC - LIMIT 1 - |] - (ntfHost, ntfPort) - ntfSubAction (connId, smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) = - let smpServer = SMPServer smpHost smpPort smpKeyHash - ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus} - in (ntfSubscription, action, actionTs) + getNtfConnId :: IO (Maybe ConnId) + getNtfConnId = + maybeFirstRow fromOnly $ + DB.query + db + [sql| + SELECT conn_id + FROM ntf_subscriptions + WHERE ntf_host = ? AND ntf_port = ? AND ntf_sub_action IS NOT NULL + AND (ntf_failed = 0 OR updated_by_supervisor = 1) + ORDER BY ntf_sub_action_ts ASC + LIMIT 1 + |] + (ntfHost, ntfPort) + getNtfSubAction :: ConnId -> IO (Either StoreError (NtfSubscription, NtfSubNTFAction, NtfActionTs)) + getNtfSubAction connId = do + markUpdatedByWorker db connId + firstRow ntfSubAction err $ + DB.query + db + [sql| + SELECT s.host, s.port, COALESCE(ns.smp_server_key_hash, s.key_hash), + ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_action + FROM ntf_subscriptions ns + JOIN servers s ON s.host = ns.smp_host AND s.port = ns.smp_port + WHERE ns.conn_id = ? + |] + (Only connId) + where + err = SEInternal $ "ntf subscription " <> bshow connId <> " returned []" + ntfSubAction (smpHost, smpPort, smpKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) = + let smpServer = SMPServer smpHost smpPort smpKeyHash + ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus} + in (ntfSubscription, action, actionTs) -getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Maybe (NtfSubscription, NtfSubSMPAction, NtfActionTs)) -getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = do - maybeFirstRow ntfSubAction getNtfSubAction_ $>>= \a@(NtfSubscription {connId}, _, _) -> do - DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = ? WHERE conn_id = ?" (False, connId) - pure $ Just a +markNtfSubActionNtfFailed_ :: DB.Connection -> ConnId -> IO () +markNtfSubActionNtfFailed_ db connId = + DB.execute db "UPDATE ntf_subscriptions SET ntf_failed = 1 where conn_id = ?" (Only connId) + +getNextNtfSubSMPAction :: DB.Connection -> SMPServer -> IO (Either StoreError (Maybe (NtfSubscription, NtfSubSMPAction, NtfActionTs))) +getNextNtfSubSMPAction db smpServer@(SMPServer smpHost smpPort _) = + getWorkItem "ntf SMP" getNtfConnId getNtfSubAction (markNtfSubActionSMPFailed_ db) where - getNtfSubAction_ = - DB.query - db - [sql| - SELECT ns.conn_id, s.ntf_host, s.ntf_port, s.ntf_key_hash, - ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_smp_action - FROM ntf_subscriptions ns - JOIN ntf_servers s USING (ntf_host, ntf_port) - WHERE ns.smp_host = ? AND ns.smp_port = ? AND ns.ntf_sub_smp_action IS NOT NULL AND ns.ntf_sub_action_ts IS NOT NULL - ORDER BY ns.ntf_sub_action_ts ASC - LIMIT 1 - |] - (smpHost, smpPort) - ntfSubAction (connId, ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) = - let ntfServer = NtfServer ntfHost ntfPort ntfKeyHash - ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus} - in (ntfSubscription, action, actionTs) + getNtfConnId :: IO (Maybe ConnId) + getNtfConnId = + maybeFirstRow fromOnly $ + DB.query + db + [sql| + SELECT conn_id + FROM ntf_subscriptions ns + WHERE smp_host = ? AND smp_port = ? AND ntf_sub_smp_action IS NOT NULL AND ntf_sub_action_ts IS NOT NULL + AND (smp_failed = 0 OR updated_by_supervisor = 1) + ORDER BY ntf_sub_action_ts ASC + LIMIT 1 + |] + (smpHost, smpPort) + getNtfSubAction :: ConnId -> IO (Either StoreError (NtfSubscription, NtfSubSMPAction, NtfActionTs)) + getNtfSubAction connId = do + markUpdatedByWorker db connId + firstRow ntfSubAction err $ + DB.query + db + [sql| + SELECT s.ntf_host, s.ntf_port, s.ntf_key_hash, + ns.smp_ntf_id, ns.ntf_sub_id, ns.ntf_sub_status, ns.ntf_sub_action_ts, ns.ntf_sub_smp_action + FROM ntf_subscriptions ns + JOIN ntf_servers s USING (ntf_host, ntf_port) + WHERE ns.conn_id = ? + |] + (Only connId) + where + err = SEInternal $ "ntf subscription " <> bshow connId <> " returned []" + ntfSubAction (ntfHost, ntfPort, ntfKeyHash, ntfQueueId, ntfSubId, ntfSubStatus, actionTs, action) = + let ntfServer = NtfServer ntfHost ntfPort ntfKeyHash + ntfSubscription = NtfSubscription {connId, smpServer, ntfQueueId, ntfServer, ntfSubId, ntfSubStatus} + in (ntfSubscription, action, actionTs) + +markNtfSubActionSMPFailed_ :: DB.Connection -> ConnId -> IO () +markNtfSubActionSMPFailed_ db connId = + DB.execute db "UPDATE ntf_subscriptions SET smp_failed = 1 where conn_id = ?" (Only connId) + +markUpdatedByWorker :: DB.Connection -> ConnId -> IO () +markUpdatedByWorker db connId = + DB.execute db "UPDATE ntf_subscriptions SET updated_by_supervisor = 0 WHERE conn_id = ?" (Only connId) getActiveNtfToken :: DB.Connection -> IO (Maybe NtfToken) getActiveNtfToken db = @@ -2289,60 +2380,84 @@ deleteRcvFile' :: DB.Connection -> DBRcvFileId -> IO () deleteRcvFile' db rcvFileId = DB.execute db "DELETE FROM rcv_files WHERE rcv_file_id = ?" (Only rcvFileId) -getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe RcvFileChunk) +getNextRcvChunkToDownload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFileChunk)) getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = do - cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime - maybeFirstRow toChunk $ - DB.query - db - [sql| - SELECT - f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, - r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries - FROM rcv_file_chunk_replicas r - JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id - JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id - JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id - WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? - AND r.received = 0 AND r.replica_number = 1 - AND f.status = ? AND f.deleted = 0 AND f.created_at >= ? - ORDER BY r.created_at ASC - LIMIT 1 - |] - (host, port, keyHash, RFSReceiving, cutoffTs) + getWorkItem "rcv_file_download" getReplicaId getChunkData (markRcvFileFailed db . snd) where - toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int64, Int)) -> RcvFileChunk - toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries)) = - RcvFileChunk - { rcvFileId, - rcvFileEntityId, - userId, - rcvChunkId, - chunkNo, - chunkSize, - digest, - fileTmpPath, - chunkTmpPath, - replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}] - } + getReplicaId :: IO (Maybe (Int64, DBRcvFileId)) + getReplicaId = do + cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime + maybeFirstRow id $ + DB.query + db + [sql| + SELECT r.rcv_file_chunk_replica_id, f.rcv_file_id + FROM rcv_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id + JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id + WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? + AND r.received = 0 AND r.replica_number = 1 + AND f.status = ? AND f.deleted = 0 AND f.created_at >= ? + AND f.failed = 0 + ORDER BY r.created_at ASC + LIMIT 1 + |] + (host, port, keyHash, RFSReceiving, cutoffTs) + getChunkData :: (Int64, DBRcvFileId) -> IO (Either StoreError RcvFileChunk) + getChunkData (rcvFileChunkReplicaId, _fileId) = + firstRow toChunk SEFileNotFound $ + DB.query + db + [sql| + SELECT + f.rcv_file_id, f.rcv_file_entity_id, f.user_id, c.rcv_file_chunk_id, c.chunk_no, c.chunk_size, c.digest, f.tmp_path, c.tmp_path, + r.rcv_file_chunk_replica_id, r.replica_id, r.replica_key, r.received, r.delay, r.retries + FROM rcv_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + JOIN rcv_file_chunks c ON c.rcv_file_chunk_id = r.rcv_file_chunk_id + JOIN rcv_files f ON f.rcv_file_id = c.rcv_file_id + WHERE r.rcv_file_chunk_replica_id = ? + |] + (Only rcvFileChunkReplicaId) + where + toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int64, Int)) -> RcvFileChunk + toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries)) = + RcvFileChunk + { rcvFileId, + rcvFileEntityId, + userId, + rcvChunkId, + chunkNo, + chunkSize, + digest, + fileTmpPath, + chunkTmpPath, + replicas = [RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}] + } -getNextRcvFileToDecrypt :: DB.Connection -> NominalDiffTime -> IO (Maybe RcvFile) -getNextRcvFileToDecrypt db ttl = do - cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime - fileId_ :: Maybe DBRcvFileId <- - maybeFirstRow fromOnly $ - DB.query - db - [sql| - SELECT rcv_file_id - FROM rcv_files - WHERE status IN (?,?) AND deleted = 0 AND created_at >= ? - ORDER BY created_at ASC LIMIT 1 - |] - (RFSReceived, RFSDecrypting, cutoffTs) - case fileId_ of - Nothing -> pure Nothing - Just fileId -> eitherToMaybe <$> getRcvFile db fileId +getNextRcvFileToDecrypt :: DB.Connection -> NominalDiffTime -> IO (Either StoreError (Maybe RcvFile)) +getNextRcvFileToDecrypt db ttl = + getWorkItem "rcv_file_decrypt" getFileId (getRcvFile db) (markRcvFileFailed db) + where + getFileId :: IO (Maybe DBRcvFileId) + getFileId = do + cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime + maybeFirstRow fromOnly $ + DB.query + db + [sql| + SELECT rcv_file_id + FROM rcv_files + WHERE status IN (?,?) AND deleted = 0 AND created_at >= ? + AND failed = 0 + ORDER BY created_at ASC LIMIT 1 + |] + (RFSReceived, RFSDecrypting, cutoffTs) + +markRcvFileFailed :: DB.Connection -> DBRcvFileId -> IO () +markRcvFileFailed db fileId = do + DB.execute db "UPDATE rcv_files SET failed = 1 WHERE rcv_file_id = ?" (Only fileId) getPendingRcvFilesServers :: DB.Connection -> NominalDiffTime -> IO [XFTPServer] getPendingRcvFilesServers db ttl = do @@ -2494,23 +2609,28 @@ getChunkReplicaRecipients_ db replicaId = |] (Only replicaId) -getNextSndFileToPrepare :: DB.Connection -> NominalDiffTime -> IO (Maybe SndFile) -getNextSndFileToPrepare db ttl = do - cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime - fileId_ :: Maybe DBSndFileId <- - maybeFirstRow fromOnly $ - DB.query - db - [sql| - SELECT snd_file_id - FROM snd_files - WHERE status IN (?,?,?) AND deleted = 0 AND created_at >= ? - ORDER BY created_at ASC LIMIT 1 - |] - (SFSNew, SFSEncrypting, SFSEncrypted, cutoffTs) - case fileId_ of - Nothing -> pure Nothing - Just fileId -> eitherToMaybe <$> getSndFile db fileId +getNextSndFileToPrepare :: DB.Connection -> NominalDiffTime -> IO (Either StoreError (Maybe SndFile)) +getNextSndFileToPrepare db ttl = + getWorkItem "snd_file_prepare" getFileId (getSndFile db) (markSndFileFailed db) + where + getFileId :: IO (Maybe DBSndFileId) + getFileId = do + cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime + maybeFirstRow fromOnly $ + DB.query + db + [sql| + SELECT snd_file_id + FROM snd_files + WHERE status IN (?,?,?) AND deleted = 0 AND created_at >= ? + AND failed = 0 + ORDER BY created_at ASC LIMIT 1 + |] + (SFSNew, SFSEncrypting, SFSEncrypted, cutoffTs) + +markSndFileFailed :: DB.Connection -> DBSndFileId -> IO () +markSndFileFailed db fileId = + DB.execute db "UPDATE snd_files SET failed = 1 WHERE snd_file_id = ?" (Only fileId) updateSndFileError :: DB.Connection -> DBSndFileId -> String -> IO () updateSndFileError db sndFileId errStr = do @@ -2554,7 +2674,10 @@ getSndFileDeleted db sndFileId = <$> maybeFirstRow fromOnly (DB.query db "SELECT deleted FROM snd_files WHERE snd_file_id = ?" (Only sndFileId)) createSndFileReplica :: DB.Connection -> SndFileChunk -> NewSndChunkReplica -> IO () -createSndFileReplica db SndFileChunk {sndChunkId} NewSndChunkReplica {server, replicaId, replicaKey, rcvIdsKeys} = do +createSndFileReplica db SndFileChunk {sndChunkId} = createSndFileReplica_ db sndChunkId + +createSndFileReplica_ :: DB.Connection -> Int64 -> NewSndChunkReplica -> IO () +createSndFileReplica_ db sndChunkId NewSndChunkReplica {server, replicaId, replicaKey, rcvIdsKeys} = do srvId <- createXFTPServer_ db server DB.execute db @@ -2575,50 +2698,69 @@ createSndFileReplica db SndFileChunk {sndChunkId} NewSndChunkReplica {server, re |] (rId, rcvId, rcvKey) -getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe SndFileChunk) +getNextSndChunkToUpload :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe SndFileChunk)) getNextSndChunkToUpload db server@ProtocolServer {host, port, keyHash} ttl = do - cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime - chunk_ <- - maybeFirstRow toChunk $ - DB.query - db - [sql| - SELECT - f.snd_file_id, f.snd_file_entity_id, f.user_id, f.num_recipients, f.prefix_path, - c.snd_file_chunk_id, c.chunk_no, c.chunk_offset, c.chunk_size, c.digest, - r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries - FROM snd_file_chunk_replicas r - JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id - JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id - JOIN snd_files f ON f.snd_file_id = c.snd_file_id - WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? - AND r.replica_status = ? AND r.replica_number = 1 - AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ? - ORDER BY r.created_at ASC - LIMIT 1 - |] - (host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs) - forM chunk_ $ \chunk@SndFileChunk {replicas} -> do - replicas' <- forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do - rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId - pure (replica :: SndFileChunkReplica) {rcvIdsKeys} - pure (chunk {replicas = replicas'} :: SndFileChunk) + getWorkItem "snd_file_upload" getReplicaId getChunkData (markSndFileFailed db . snd) where - toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk - toChunk ((sndFileId, sndFileEntityId, userId, numRecipients, filePrefixPath) :. (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) :. (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries)) = - let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize} - in SndFileChunk - { sndFileId, - sndFileEntityId, - userId, - numRecipients, - sndChunkId, - chunkNo, - chunkSpec, - digest, - filePrefixPath, - replicas = [SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []}] - } + getReplicaId :: IO (Maybe (Int64, DBSndFileId)) + getReplicaId = do + cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime + maybeFirstRow id $ + DB.query + db + [sql| + SELECT r.snd_file_chunk_replica_id, f.snd_file_id + FROM snd_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id + JOIN snd_files f ON f.snd_file_id = c.snd_file_id + WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? + AND r.replica_status = ? AND r.replica_number = 1 + AND (f.status = ? OR f.status = ?) AND f.deleted = 0 AND f.created_at >= ? + AND f.failed = 0 + ORDER BY r.created_at ASC + LIMIT 1 + |] + (host, port, keyHash, SFRSCreated, SFSEncrypted, SFSUploading, cutoffTs) + getChunkData :: (Int64, DBSndFileId) -> IO (Either StoreError SndFileChunk) + getChunkData (sndFileChunkReplicaId, _fileId) = do + chunk_ <- + firstRow toChunk SEFileNotFound $ + DB.query + db + [sql| + SELECT + f.snd_file_id, f.snd_file_entity_id, f.user_id, f.num_recipients, f.prefix_path, + c.snd_file_chunk_id, c.chunk_no, c.chunk_offset, c.chunk_size, c.digest, + r.snd_file_chunk_replica_id, r.replica_id, r.replica_key, r.replica_status, r.delay, r.retries + FROM snd_file_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + JOIN snd_file_chunks c ON c.snd_file_chunk_id = r.snd_file_chunk_id + JOIN snd_files f ON f.snd_file_id = c.snd_file_id + WHERE r.snd_file_chunk_replica_id = ? + |] + (Only sndFileChunkReplicaId) + forM chunk_ $ \chunk@SndFileChunk {replicas} -> do + replicas' <- forM replicas $ \replica@SndFileChunkReplica {sndChunkReplicaId} -> do + rcvIdsKeys <- getChunkReplicaRecipients_ db sndChunkReplicaId + pure (replica :: SndFileChunkReplica) {rcvIdsKeys} + pure (chunk {replicas = replicas'} :: SndFileChunk) + where + toChunk :: ((DBSndFileId, SndFileId, UserId, Int, FilePath) :. (Int64, Int, Int64, Word32, FileDigest) :. (Int64, ChunkReplicaId, C.APrivateSignKey, SndFileReplicaStatus, Maybe Int64, Int)) -> SndFileChunk + toChunk ((sndFileId, sndFileEntityId, userId, numRecipients, filePrefixPath) :. (sndChunkId, chunkNo, chunkOffset, chunkSize, digest) :. (sndChunkReplicaId, replicaId, replicaKey, replicaStatus, delay, retries)) = + let chunkSpec = XFTPChunkSpec {filePath = sndFileEncPath filePrefixPath, chunkOffset, chunkSize} + in SndFileChunk + { sndFileId, + sndFileEntityId, + userId, + numRecipients, + sndChunkId, + chunkNo, + chunkSpec, + digest, + filePrefixPath, + replicas = [SndFileChunkReplica {sndChunkReplicaId, server, replicaId, replicaKey, replicaStatus, delay, retries, rcvIdsKeys = []}] + } updateSndChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO () updateSndChunkReplicaDelay db replicaId delay = do @@ -2723,25 +2865,29 @@ getDeletedSndChunkReplica db deletedSndChunkReplicaId = let server = XFTPServer host port keyHash in DeletedSndChunkReplica {deletedSndChunkReplicaId, userId, server, replicaId, replicaKey, chunkDigest, delay, retries} -getNextDeletedSndChunkReplica :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Maybe DeletedSndChunkReplica) -getNextDeletedSndChunkReplica db ProtocolServer {host, port, keyHash} ttl = do - cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime - replicaId_ :: Maybe Int64 <- - maybeFirstRow fromOnly $ - DB.query - db - [sql| - SELECT r.deleted_snd_chunk_replica_id - FROM deleted_snd_chunk_replicas r - JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id - WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? - AND r.created_at >= ? - ORDER BY r.created_at ASC LIMIT 1 - |] - (host, port, keyHash, cutoffTs) - case replicaId_ of - Nothing -> pure Nothing - Just replicaId -> eitherToMaybe <$> getDeletedSndChunkReplica db replicaId +getNextDeletedSndChunkReplica :: DB.Connection -> XFTPServer -> NominalDiffTime -> IO (Either StoreError (Maybe DeletedSndChunkReplica)) +getNextDeletedSndChunkReplica db ProtocolServer {host, port, keyHash} ttl = + getWorkItem "deleted replica" getReplicaId (getDeletedSndChunkReplica db) markReplicaFailed + where + getReplicaId :: IO (Maybe Int64) + getReplicaId = do + cutoffTs <- addUTCTime (-ttl) <$> getCurrentTime + maybeFirstRow fromOnly $ + DB.query + db + [sql| + SELECT r.deleted_snd_chunk_replica_id + FROM deleted_snd_chunk_replicas r + JOIN xftp_servers s ON s.xftp_server_id = r.xftp_server_id + WHERE s.xftp_host = ? AND s.xftp_port = ? AND s.xftp_key_hash = ? + AND r.created_at >= ? + AND failed = 0 + ORDER BY r.created_at ASC LIMIT 1 + |] + (host, port, keyHash, cutoffTs) + markReplicaFailed :: Int64 -> IO () + markReplicaFailed replicaId = do + DB.execute db "UPDATE deleted_snd_chunk_replicas SET failed = 1 WHERE deleted_snd_chunk_replica_id = ?" (Only replicaId) updateDeletedSndChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO () updateDeletedSndChunkReplicaDelay db deletedSndChunkReplicaId delay = do diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs index db287f8ce..341869b85 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations.hs @@ -66,6 +66,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230722_indexes import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230814_indexes import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20230829_crypto_files import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231222_command_created_at +import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (dropPrefix, sumTypeJSON) import Simplex.Messaging.Transport.Client (TransportHost) @@ -100,7 +101,8 @@ schemaMigrations = ("m20230722_indexes", m20230722_indexes, Just down_m20230722_indexes), ("m20230814_indexes", m20230814_indexes, Just down_m20230814_indexes), ("m20230829_crypto_files", m20230829_crypto_files, Just down_m20230829_crypto_files), - ("m20231222_command_created_at", m20231222_command_created_at, Just down_m20231222_command_created_at) + ("m20231222_command_created_at", m20231222_command_created_at, Just down_m20231222_command_created_at), + ("m20231225_failed_work_items", m20231225_failed_work_items, Just down_m20231225_failed_work_items) ] -- | The list of migrations in ascending order by date diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20231225_failed_work_items.hs b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20231225_failed_work_items.hs new file mode 100644 index 000000000..f388070f5 --- /dev/null +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/M20231225_failed_work_items.hs @@ -0,0 +1,38 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20231225_failed_work_items where + +import Database.SQLite.Simple (Query) +import Database.SQLite.Simple.QQ (sql) + +m20231225_failed_work_items :: Query +m20231225_failed_work_items = + [sql| +ALTER TABLE snd_message_deliveries ADD COLUMN failed INTEGER DEFAULT 0; +ALTER TABLE commands ADD COLUMN failed INTEGER DEFAULT 0; +ALTER TABLE ntf_subscriptions ADD COLUMN ntf_failed INTEGER DEFAULT 0; +ALTER TABLE ntf_subscriptions ADD COLUMN smp_failed INTEGER DEFAULT 0; +ALTER TABLE rcv_files ADD COLUMN failed INTEGER DEFAULT 0; +ALTER TABLE snd_files ADD COLUMN failed INTEGER DEFAULT 0; +ALTER TABLE deleted_snd_chunk_replicas ADD COLUMN failed INTEGER DEFAULT 0; + +CREATE INDEX idx_rcv_files_status_created_at ON rcv_files(status, created_at); +CREATE INDEX idx_snd_files_status_created_at ON snd_files(status, created_at); +CREATE INDEX idx_snd_files_snd_file_entity_id ON snd_files(snd_file_entity_id); +|] + +down_m20231225_failed_work_items :: Query +down_m20231225_failed_work_items = + [sql| +DROP INDEX idx_rcv_files_status_created_at; +DROP INDEX idx_snd_files_status_created_at; +DROP INDEX idx_snd_files_snd_file_entity_id; + +ALTER TABLE snd_message_deliveries DROP COLUMN failed; +ALTER TABLE commands DROP COLUMN failed; +ALTER TABLE ntf_subscriptions DROP COLUMN ntf_failed; +ALTER TABLE ntf_subscriptions DROP COLUMN smp_failed; +ALTER TABLE rcv_files DROP COLUMN failed; +ALTER TABLE snd_files DROP COLUMN failed; +ALTER TABLE deleted_snd_chunk_replicas DROP COLUMN failed; +|] diff --git a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql index d917f0324..db80c50be 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql +++ b/src/Simplex/Messaging/Agent/Store/SQLite/Migrations/agent_schema.sql @@ -213,6 +213,8 @@ CREATE TABLE ntf_subscriptions( created_at TEXT NOT NULL DEFAULT(datetime('now')), updated_at TEXT NOT NULL DEFAULT(datetime('now')), smp_server_key_hash BLOB, + ntf_failed INTEGER DEFAULT 0, + smp_failed INTEGER DEFAULT 0, PRIMARY KEY(conn_id), FOREIGN KEY(smp_host, smp_port) REFERENCES servers(host, port) ON DELETE SET NULL ON UPDATE CASCADE, @@ -230,6 +232,7 @@ CREATE TABLE commands( agent_version INTEGER NOT NULL DEFAULT 1, server_key_hash BLOB, created_at TEXT NOT NULL DEFAULT('1970-01-01 00:00:00'), + failed INTEGER DEFAULT 0, FOREIGN KEY(host, port) REFERENCES servers ON DELETE RESTRICT ON UPDATE CASCADE ); @@ -238,6 +241,7 @@ CREATE TABLE snd_message_deliveries( conn_id BLOB NOT NULL REFERENCES connections ON DELETE CASCADE, snd_queue_id INTEGER NOT NULL, internal_id INTEGER NOT NULL, + failed INTEGER DEFAULT 0, FOREIGN KEY(conn_id, internal_id) REFERENCES messages ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED ); CREATE TABLE sqlite_sequence(name,seq); @@ -274,6 +278,7 @@ CREATE TABLE rcv_files( updated_at TEXT NOT NULL DEFAULT(datetime('now')), save_file_key BLOB, save_file_nonce BLOB, + failed INTEGER DEFAULT 0, UNIQUE(rcv_file_entity_id) ); CREATE TABLE rcv_file_chunks( @@ -316,7 +321,8 @@ CREATE TABLE snd_files( updated_at TEXT NOT NULL DEFAULT(datetime('now')) , src_file_key BLOB, - src_file_nonce BLOB + src_file_nonce BLOB, + failed INTEGER DEFAULT 0 ); CREATE TABLE snd_file_chunks( snd_file_chunk_id INTEGER PRIMARY KEY, @@ -360,6 +366,8 @@ CREATE TABLE deleted_snd_chunk_replicas( retries INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL DEFAULT(datetime('now')), updated_at TEXT NOT NULL DEFAULT(datetime('now')) + , + failed INTEGER DEFAULT 0 ); CREATE TABLE encrypted_rcv_message_hashes( encrypted_rcv_message_hash_id INTEGER PRIMARY KEY, @@ -486,3 +494,6 @@ CREATE INDEX idx_commands_server_commands ON commands( created_at, command_id ); +CREATE INDEX idx_rcv_files_status_created_at ON rcv_files(status, created_at); +CREATE INDEX idx_snd_files_status_created_at ON snd_files(status, created_at); +CREATE INDEX idx_snd_files_snd_file_entity_id ON snd_files(snd_file_entity_id); diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index a00a6985b..9e4fceab0 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -44,7 +44,7 @@ agentTests (ATransport t) = do describe "SQLite store" storeTests describe "Migration tests" migrationTests describe "SMP agent protocol syntax" $ syntaxTests t - describe "Establishing duplex connection" $ do + describe "Establishing duplex connection (via agent protocol)" $ do it "should connect via one server and one agent" $ do smpAgentTest2_1_1 $ testDuplexConnection t it "should connect via one server and one agent (random IDs)" $ do diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 067b1b0bc..0977133f9 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -177,6 +177,8 @@ functionalAPITests t = do testMatrix2 t runAgentClientTest it "should connect when server with multiple identities is stored" $ withSmpServer t testServerMultipleIdentities + it "should connect with two peers" $ + withSmpServer t testAgentClient3 describe "Establishing duplex connection v2, different Ratchet versions" $ testRatchetMatrix2 t runAgentClientTest describe "Establish duplex connection via contact address" $ @@ -417,6 +419,32 @@ runAgentClientTest alice bob baseId = do where msgId = subtract baseId +testAgentClient3 :: HasCallStack => IO () +testAgentClient3 = do + a <- getSMPAgentClient' agentCfg initAgentServers testDB + b <- getSMPAgentClient' agentCfg initAgentServers testDB2 + c <- getSMPAgentClient' agentCfg initAgentServers testDB3 + runRight_ $ do + (aIdForB, bId) <- makeConnection a b + (aIdForC, cId) <- makeConnection a c + + 4 <- sendMessage a bId SMP.noMsgFlags "b4" + 4 <- sendMessage a cId SMP.noMsgFlags "c4" + 5 <- sendMessage a bId SMP.noMsgFlags "b5" + 5 <- sendMessage a cId SMP.noMsgFlags "c5" + get a =##> \case ("", connId, SENT 4) -> connId == bId || connId == cId; _ -> False + get a =##> \case ("", connId, SENT 4) -> connId == bId || connId == cId; _ -> False + get a =##> \case ("", connId, SENT 5) -> connId == bId || connId == cId; _ -> False + get a =##> \case ("", connId, SENT 5) -> connId == bId || connId == cId; _ -> False + get b =##> \case ("", connId, Msg "b4") -> connId == aIdForB; _ -> False + ackMessage b aIdForB 4 Nothing + get b =##> \case ("", connId, Msg "b5") -> connId == aIdForB; _ -> False + ackMessage b aIdForB 5 Nothing + get c =##> \case ("", connId, Msg "c4") -> connId == aIdForC; _ -> False + ackMessage c aIdForC 4 Nothing + get c =##> \case ("", connId, Msg "c5") -> connId == aIdForC; _ -> False + ackMessage c aIdForC 5 Nothing + runAgentClientContactTest :: HasCallStack => AgentClient -> AgentClient -> AgentMsgId -> IO () runAgentClientContactTest alice bob baseId = do runRight_ $ do diff --git a/tests/AgentTests/SQLiteTests.hs b/tests/AgentTests/SQLiteTests.hs index af14644ea..92370c05b 100644 --- a/tests/AgentTests/SQLiteTests.hs +++ b/tests/AgentTests/SQLiteTests.hs @@ -22,9 +22,14 @@ import qualified Data.Text as T import Data.Text.Encoding (encodeUtf8) import Data.Time import Data.Word (Word32) +import Database.SQLite.Simple (Only (..)) import qualified Database.SQLite.Simple as SQL import Database.SQLite.Simple.QQ (sql) import SMPClient (testKeyHash) +import Simplex.FileTransfer.Client (XFTPChunkSpec (..)) +import Simplex.FileTransfer.Description +import Simplex.FileTransfer.Protocol +import Simplex.FileTransfer.Types import Simplex.Messaging.Agent.Client () import Simplex.Messaging.Agent.Protocol import Simplex.Messaging.Agent.Store @@ -32,6 +37,9 @@ import Simplex.Messaging.Agent.Store.SQLite import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations import qualified Simplex.Messaging.Crypto as C +import Simplex.Messaging.Crypto.File (CryptoFile (..)) +import Simplex.Messaging.Encoding.String (StrEncoding (..)) +import Simplex.Messaging.Protocol (SubscriptionMode (..)) import qualified Simplex.Messaging.Protocol as SMP import System.Random import Test.Hspec @@ -109,6 +117,16 @@ storeTests = do testCreateRcvMsg testCreateSndMsg testCreateRcvAndSndMsgs + describe "Work items" $ do + it "should getPendingQueueMsg" testGetPendingQueueMsg + it "should getPendingServerCommand" testGetPendingServerCommand + it "should getNextRcvChunkToDownload" testGetNextRcvChunkToDownload + it "should getNextRcvFileToDecrypt" testGetNextRcvFileToDecrypt + it "should getNextSndFileToPrepare" testGetNextSndFileToPrepare + it "should getNextSndChunkToUpload" testGetNextSndChunkToUpload + it "should getNextDeletedSndChunkReplica" testGetNextDeletedSndChunkReplica + it "should markNtfSubActionNtfFailed_" testMarkNtfSubActionNtfFailed + it "should markNtfSubActionSMPFailed_" testMarkNtfSubActionSMPFailed describe "open/close store" $ do it "should close and re-open" testCloseReopenStore it "should close and re-open encrypted store" testCloseReopenEncryptedStore @@ -163,12 +181,15 @@ testPrivDhKey = "MC4CAQAwBQYDK2VuBCIEINCzbVFaCiYHoYncxNY8tSIfn0pXcIAhLBfFc0m+gOp testDhSecret :: C.DhSecretX25519 testDhSecret = "01234567890123456789012345678901" +smpServer1 :: SMPServer +smpServer1 = SMPServer "smp.simplex.im" "5223" testKeyHash + rcvQueue1 :: NewRcvQueue rcvQueue1 = RcvQueue { userId = 1, connId = "conn1", - server = SMPServer "smp.simplex.im" "5223" testKeyHash, + server = smpServer1, rcvId = "1234", rcvPrivateKey = testPrivateSignKey, rcvDhSecret = testDhSecret, @@ -190,7 +211,7 @@ sndQueue1 = SndQueue { userId = 1, connId = "conn1", - server = SMPServer "smp.simplex.im" "5223" testKeyHash, + server = smpServer1, sndId = "3456", sndPublicKey = Nothing, sndPrivateKey = testPrivateSignKey, @@ -478,23 +499,25 @@ mkSndMsgData internalId internalSndId internalHash = prevMsgHash = internalHash } -testCreateSndMsg_ :: DB.Connection -> PrevSndMsgHash -> ConnId -> SndMsgData -> Expectation -testCreateSndMsg_ db expectedPrevHash connId sndMsgData@SndMsgData {..} = do +testCreateSndMsg_ :: DB.Connection -> PrevSndMsgHash -> ConnId -> SndQueue -> SndMsgData -> Expectation +testCreateSndMsg_ db expectedPrevHash connId sq sndMsgData@SndMsgData {..} = do updateSndIds db connId `shouldReturn` (internalId, internalSndId, expectedPrevHash) createSndMsg db connId sndMsgData `shouldReturn` () + createSndMsgDelivery db connId sq internalId + `shouldReturn` () testCreateSndMsg :: SpecWith SQLiteStore testCreateSndMsg = it "should create a SndMsg and return InternalId and PrevSndMsgHash" $ \st -> do g <- C.newRandom let ConnData {connId} = cData1 - _ <- withTransaction st $ \db -> do + Right (_, sq) <- withTransaction st $ \db -> do createSndConn db g cData1 sndQueue1 withTransaction st $ \db -> do - testCreateSndMsg_ db "" connId $ mkSndMsgData (InternalId 1) (InternalSndId 1) "hash_dummy" - testCreateSndMsg_ db "hash_dummy" connId $ mkSndMsgData (InternalId 2) (InternalSndId 2) "new_hash_dummy" + testCreateSndMsg_ db "" connId sq $ mkSndMsgData (InternalId 1) (InternalSndId 1) "hash_dummy" + testCreateSndMsg_ db "hash_dummy" connId sq $ mkSndMsgData (InternalId 2) (InternalSndId 2) "new_hash_dummy" testCreateRcvAndSndMsgs :: SpecWith SQLiteStore testCreateRcvAndSndMsgs = @@ -504,13 +527,13 @@ testCreateRcvAndSndMsgs = g <- C.newRandom createRcvConn db g cData1 rcvQueue1 SCMInvitation withTransaction st $ \db -> do - _ <- upgradeRcvConnToDuplex db "conn1" sndQueue1 + Right sq <- upgradeRcvConnToDuplex db "conn1" sndQueue1 testCreateRcvMsg_ db 0 "" connId rq $ mkRcvMsgData (InternalId 1) (InternalRcvId 1) 1 "1" "rcv_hash_1" testCreateRcvMsg_ db 1 "rcv_hash_1" connId rq $ mkRcvMsgData (InternalId 2) (InternalRcvId 2) 2 "2" "rcv_hash_2" - testCreateSndMsg_ db "" connId $ mkSndMsgData (InternalId 3) (InternalSndId 1) "snd_hash_1" + testCreateSndMsg_ db "" connId sq $ mkSndMsgData (InternalId 3) (InternalSndId 1) "snd_hash_1" testCreateRcvMsg_ db 2 "rcv_hash_2" connId rq $ mkRcvMsgData (InternalId 4) (InternalRcvId 3) 3 "3" "rcv_hash_3" - testCreateSndMsg_ db "snd_hash_1" connId $ mkSndMsgData (InternalId 5) (InternalSndId 2) "snd_hash_2" - testCreateSndMsg_ db "snd_hash_2" connId $ mkSndMsgData (InternalId 6) (InternalSndId 3) "snd_hash_3" + testCreateSndMsg_ db "snd_hash_1" connId sq $ mkSndMsgData (InternalId 5) (InternalSndId 2) "snd_hash_2" + testCreateSndMsg_ db "snd_hash_2" connId sq $ mkSndMsgData (InternalId 6) (InternalSndId 3) "snd_hash_3" testCloseReopenStore :: IO () testCloseReopenStore = do @@ -562,3 +585,201 @@ hasMigrations st = getMigrations st `shouldReturn` True errorGettingMigrations :: SQLiteStore -> Expectation errorGettingMigrations st = getMigrations st `shouldThrow` \(e :: SomeException) -> "ErrorMisuse" `isInfixOf` show e + +testGetPendingQueueMsg :: SQLiteStore -> Expectation +testGetPendingQueueMsg st = do + g <- C.newRandom + withTransaction st $ \db -> do + Right (connId, sq) <- createSndConn db g cData1 {connId = ""} sndQueue1 + Right Nothing <- getPendingQueueMsg db connId sq + testCreateSndMsg_ db "" connId sq $ mkSndMsgData (InternalId 1) (InternalSndId 1) "hash_dummy" + DB.execute db "UPDATE messages SET msg_type = cast('bad' as blob) WHERE conn_id = ? AND internal_id = ?" (connId, 1 :: Int) + testCreateSndMsg_ db "hash_dummy" connId sq $ mkSndMsgData (InternalId 2) (InternalSndId 2) "new_hash_dummy" + + Left e <- getPendingQueueMsg db connId sq + show e `shouldContain` "bad AgentMessageType" + DB.query_ db "SELECT conn_id, internal_id FROM snd_message_deliveries WHERE failed = 1" `shouldReturn` [(connId, 1 :: Int)] + + Right (Just (Nothing, PendingMsgData {msgId})) <- getPendingQueueMsg db connId sq + msgId `shouldBe` InternalId 2 + +testGetPendingServerCommand :: SQLiteStore -> Expectation +testGetPendingServerCommand st = do + g <- C.newRandom + withTransaction st $ \db -> do + Right Nothing <- getPendingServerCommand db Nothing + Right connId <- createNewConn db g cData1 {connId = ""} SCMInvitation + Right () <- createCommand db "1" connId Nothing command + corruptCmd db "1" connId + Right () <- createCommand db "2" connId Nothing command + + Left e <- getPendingServerCommand db Nothing + show e `shouldContain` "bad AgentCmdType" + DB.query_ db "SELECT conn_id, corr_id FROM commands WHERE failed = 1" `shouldReturn` [(connId, "1" :: ByteString)] + + Right (Just PendingCommand {corrId}) <- getPendingServerCommand db Nothing + corrId `shouldBe` "2" + + Right _ <- updateNewConnRcv db connId rcvQueue1 + Right Nothing <- getPendingServerCommand db $ Just smpServer1 + Right () <- createCommand db "3" connId (Just smpServer1) command + corruptCmd db "3" connId + Right () <- createCommand db "4" connId (Just smpServer1) command + + Left e' <- getPendingServerCommand db (Just smpServer1) + show e' `shouldContain` "bad AgentCmdType" + DB.query_ db "SELECT conn_id, corr_id FROM commands WHERE failed = 1" `shouldReturn` [(connId, "1" :: ByteString), (connId, "3" :: ByteString)] + + Right (Just PendingCommand {corrId = corrId'}) <- getPendingServerCommand db (Just smpServer1) + corrId' `shouldBe` "4" + where + command = AClientCommand $ APC SAEConn $ NEW True (ACM SCMInvitation) SMSubscribe + corruptCmd :: DB.Connection -> ByteString -> ConnId -> IO () + corruptCmd db corrId connId = DB.execute db "UPDATE commands SET command = cast('bad' as blob) WHERE conn_id = ? AND corr_id = ?" (connId, corrId) + +xftpServer1 :: SMP.XFTPServer +xftpServer1 = SMP.ProtocolServer SMP.SPXFTP "xftp.simplex.im" "5223" testKeyHash + +rcvFileDescr1 :: FileDescription 'FRecipient +rcvFileDescr1 = + FileDescription + { party = SFRecipient, + size = FileSize $ mb 26, + digest = FileDigest "abc", + key = testFileSbKey, + nonce = testFileCbNonce, + chunkSize = defaultChunkSize, + chunks = + [ FileChunk + { chunkNo = 1, + digest = chunkDigest, + chunkSize = defaultChunkSize, + replicas = [FileChunkReplica {server = xftpServer1, replicaId, replicaKey = testFileReplicaKey}] + } + ] + } + where + defaultChunkSize = FileSize $ mb 8 + replicaId = ChunkReplicaId "abc" + chunkDigest = FileDigest "ghi" + +testFileSbKey :: C.SbKey +testFileSbKey = either error id $ strDecode "00n8p1tJq5E-SGnHcYTOrS4A9I07gTA_WFD6MTFFFOY=" + +testFileCbNonce :: C.CbNonce +testFileCbNonce = either error id $ strDecode "dPSF-wrQpDiK_K6sYv0BDBZ9S4dg-jmu" + +testFileReplicaKey :: C.APrivateSignKey +testFileReplicaKey = C.APrivateSignKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe" + +testGetNextRcvChunkToDownload :: SQLiteStore -> Expectation +testGetNextRcvChunkToDownload st = do + g <- C.newRandom + withTransaction st $ \db -> do + Right Nothing <- getNextRcvChunkToDownload db xftpServer1 86400 + + Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) + DB.execute_ db "UPDATE rcv_file_chunk_replicas SET replica_key = cast('bad' as blob) WHERE rcv_file_chunk_replica_id = 1" + Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) + + Left e <- getNextRcvChunkToDownload db xftpServer1 86400 + show e `shouldContain` "ConversionFailed" + DB.query_ db "SELECT rcv_file_id FROM rcv_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)] + + Right (Just RcvFileChunk {rcvFileEntityId}) <- getNextRcvChunkToDownload db xftpServer1 86400 + rcvFileEntityId `shouldBe` fId2 + +testGetNextRcvFileToDecrypt :: SQLiteStore -> Expectation +testGetNextRcvFileToDecrypt st = do + g <- C.newRandom + withTransaction st $ \db -> do + Right Nothing <- getNextRcvFileToDecrypt db 86400 + + Right _ <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) + DB.execute_ db "UPDATE rcv_files SET status = 'received' WHERE rcv_file_id = 1" + DB.execute_ db "UPDATE rcv_file_chunk_replicas SET replica_key = cast('bad' as blob) WHERE rcv_file_chunk_replica_id = 1" + Right fId2 <- createRcvFile db g 1 rcvFileDescr1 "filepath" "filepath" (CryptoFile "filepath" Nothing) + DB.execute_ db "UPDATE rcv_files SET status = 'received' WHERE rcv_file_id = 2" + + Left e <- getNextRcvFileToDecrypt db 86400 + show e `shouldContain` "ConversionFailed" + DB.query_ db "SELECT rcv_file_id FROM rcv_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)] + + Right (Just RcvFile {rcvFileEntityId}) <- getNextRcvFileToDecrypt db 86400 + rcvFileEntityId `shouldBe` fId2 + +testGetNextSndFileToPrepare :: SQLiteStore -> Expectation +testGetNextSndFileToPrepare st = do + g <- C.newRandom + withTransaction st $ \db -> do + Right Nothing <- getNextSndFileToPrepare db 86400 + + Right _ <- createSndFile db g 1 (CryptoFile "filepath" Nothing) 1 "filepath" testFileSbKey testFileCbNonce + DB.execute_ db "UPDATE snd_files SET status = 'new', num_recipients = 'bad' WHERE snd_file_id = 1" + Right fId2 <- createSndFile db g 1 (CryptoFile "filepath" Nothing) 1 "filepath" testFileSbKey testFileCbNonce + DB.execute_ db "UPDATE snd_files SET status = 'new' WHERE snd_file_id = 2" + + Left e <- getNextSndFileToPrepare db 86400 + show e `shouldContain` "ConversionFailed" + DB.query_ db "SELECT snd_file_id FROM snd_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)] + + Right (Just SndFile {sndFileEntityId}) <- getNextSndFileToPrepare db 86400 + sndFileEntityId `shouldBe` fId2 + +newSndChunkReplica1 :: NewSndChunkReplica +newSndChunkReplica1 = + NewSndChunkReplica + { server = xftpServer1, + replicaId = ChunkReplicaId "abc", + replicaKey = testFileReplicaKey, + rcvIdsKeys = [(ChunkReplicaId "abc", testFileReplicaKey)] + } + +testGetNextSndChunkToUpload :: SQLiteStore -> Expectation +testGetNextSndChunkToUpload st = do + g <- C.newRandom + withTransaction st $ \db -> do + Right Nothing <- getNextSndChunkToUpload db xftpServer1 86400 + + -- create file 1 + Right _ <- createSndFile db g 1 (CryptoFile "filepath" Nothing) 1 "filepath" testFileSbKey testFileCbNonce + updateSndFileEncrypted db 1 (FileDigest "abc") [(XFTPChunkSpec "filepath" 1 1, FileDigest "ghi")] + createSndFileReplica_ db 1 newSndChunkReplica1 + DB.execute_ db "UPDATE snd_files SET num_recipients = 'bad' WHERE snd_file_id = 1" + -- create file 2 + Right fId2 <- createSndFile db g 1 (CryptoFile "filepath" Nothing) 1 "filepath" testFileSbKey testFileCbNonce + updateSndFileEncrypted db 2 (FileDigest "abc") [(XFTPChunkSpec "filepath" 1 1, FileDigest "ghi")] + createSndFileReplica_ db 2 newSndChunkReplica1 + + Left e <- getNextSndChunkToUpload db xftpServer1 86400 + show e `shouldContain` "ConversionFailed" + DB.query_ db "SELECT snd_file_id FROM snd_files WHERE failed = 1" `shouldReturn` [Only (1 :: Int)] + + Right (Just SndFileChunk {sndFileEntityId}) <- getNextSndChunkToUpload db xftpServer1 86400 + sndFileEntityId `shouldBe` fId2 + +testGetNextDeletedSndChunkReplica :: SQLiteStore -> Expectation +testGetNextDeletedSndChunkReplica st = do + withTransaction st $ \db -> do + Right Nothing <- getNextDeletedSndChunkReplica db xftpServer1 86400 + + createDeletedSndChunkReplica db 1 (FileChunkReplica xftpServer1 (ChunkReplicaId "abc") testFileReplicaKey) (FileDigest "ghi") + DB.execute_ db "UPDATE deleted_snd_chunk_replicas SET retries = 'bad' WHERE deleted_snd_chunk_replica_id = 1" + createDeletedSndChunkReplica db 1 (FileChunkReplica xftpServer1 (ChunkReplicaId "abc") testFileReplicaKey) (FileDigest "ghi") + + Left e <- getNextDeletedSndChunkReplica db xftpServer1 86400 + show e `shouldContain` "ConversionFailed" + DB.query_ db "SELECT deleted_snd_chunk_replica_id FROM deleted_snd_chunk_replicas WHERE failed = 1" `shouldReturn` [Only (1 :: Int)] + + Right (Just DeletedSndChunkReplica {deletedSndChunkReplicaId}) <- getNextDeletedSndChunkReplica db xftpServer1 86400 + deletedSndChunkReplicaId `shouldBe` 2 + +testMarkNtfSubActionNtfFailed :: SQLiteStore -> Expectation +testMarkNtfSubActionNtfFailed st = do + withTransaction st $ \db -> do + markNtfSubActionNtfFailed_ db "abc" + +testMarkNtfSubActionSMPFailed :: SQLiteStore -> Expectation +testMarkNtfSubActionSMPFailed st = do + withTransaction st $ \db -> do + markNtfSubActionSMPFailed_ db "abc"