mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-18 22:05:19 +00:00
add replica uploading state
This commit is contained in:
committed by
Evgeny Poberezkin
parent
07ebf332db
commit
7068213aa6
@@ -61,6 +61,7 @@ import Simplex.Messaging.Agent.Client
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Protocol
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Agent.Store (StoreError (SEInternal))
|
||||
import Simplex.Messaging.Agent.Store.SQLite
|
||||
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
@@ -479,12 +480,17 @@ runXFTPSndWorker c srv Worker {doWork} = do
|
||||
unlessM (doesFileExist fsFilePath) $ throwError $ INTERNAL "encrypted file doesn't exist on upload"
|
||||
let chunkSpec' = chunkSpec {filePath = fsFilePath} :: XFTPChunkSpec
|
||||
atomically $ assertAgentForeground c
|
||||
liftIO . putStrLn $ "Uploading " <> show (sndFileId, chunkSpec)
|
||||
withStore c $ \db -> do
|
||||
ok <- updateSndChunkReplicaStatus db sndChunkReplicaId SFRSCreated SFRSUploading
|
||||
if ok then pure (Right ()) else pure (Left $ SEInternal "file already uploading")
|
||||
agentXFTPUploadChunk c userId chunkDigest replica' chunkSpec'
|
||||
atomically $ waitUntilForeground c
|
||||
-- liftIO $ putStrLn $ "uploaded: " <> show chunkOffset
|
||||
sf@SndFile {sndFileEntityId, prefixPath, chunks} <- withStore c $ \db -> do
|
||||
updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploaded
|
||||
getSndFile db sndFileId
|
||||
liftIO . putStrLn $ "Setting SFRSUploaded for " <> show (sndFileId, chunkSpec)
|
||||
ok <- updateSndChunkReplicaStatus db sndChunkReplicaId SFRSUploading SFRSUploaded
|
||||
if ok then getSndFile db sndFileId else pure (Left $ SEInternal "file upload didn't start")
|
||||
let uploaded = uploadedSize chunks
|
||||
total = totalSize chunks
|
||||
complete = all chunkUploaded chunks
|
||||
|
||||
@@ -209,6 +209,7 @@ data SndFileChunkReplica = SndFileChunkReplica
|
||||
|
||||
data SndFileReplicaStatus
|
||||
= SFRSCreated
|
||||
| SFRSUploading
|
||||
| SFRSUploaded
|
||||
deriving (Eq, Show)
|
||||
|
||||
@@ -219,10 +220,12 @@ instance ToField SndFileReplicaStatus where toField = toField . textEncode
|
||||
instance TextEncoding SndFileReplicaStatus where
|
||||
textDecode = \case
|
||||
"created" -> Just SFRSCreated
|
||||
"uploading" -> Just SFRSUploading
|
||||
"uploaded" -> Just SFRSUploaded
|
||||
_ -> Nothing
|
||||
textEncode = \case
|
||||
SFRSCreated -> "created"
|
||||
SFRSUploading -> "uploading"
|
||||
SFRSUploaded -> "uploaded"
|
||||
|
||||
data DeletedSndChunkReplica = DeletedSndChunkReplica
|
||||
|
||||
@@ -2869,10 +2869,13 @@ addSndChunkReplicaRecipients db r@SndFileChunkReplica {sndChunkReplicaId} rcvIds
|
||||
rcvIdsKeys' <- getChunkReplicaRecipients_ db sndChunkReplicaId
|
||||
pure (r :: SndFileChunkReplica) {rcvIdsKeys = rcvIdsKeys'}
|
||||
|
||||
updateSndChunkReplicaStatus :: DB.Connection -> Int64 -> SndFileReplicaStatus -> IO ()
|
||||
updateSndChunkReplicaStatus db replicaId status = do
|
||||
updateSndChunkReplicaStatus :: DB.Connection -> Int64 -> SndFileReplicaStatus -> SndFileReplicaStatus -> IO Bool
|
||||
updateSndChunkReplicaStatus db@DB.Connection {conn} replicaId old new = do
|
||||
cur <- DB.query db "SELECT replica_status FROM snd_file_chunk_replicas WHERE snd_file_chunk_replica_id = ? AND replica_status = ?" (replicaId, old)
|
||||
print (replicaId, map fromOnly cur :: [SndFileReplicaStatus])
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE snd_file_chunk_replicas SET replica_status = ?, updated_at = ? WHERE snd_file_chunk_replica_id = ?" (status, updatedAt, replicaId)
|
||||
DB.execute db "UPDATE snd_file_chunk_replicas SET replica_status = ?, updated_at = ? WHERE snd_file_chunk_replica_id = ? AND replica_status = ?" (new, updatedAt, replicaId, old)
|
||||
(> 0) <$> SQL.changes conn
|
||||
|
||||
getPendingSndFilesServers :: DB.Connection -> NominalDiffTime -> IO [XFTPServer]
|
||||
getPendingSndFilesServers db ttl = do
|
||||
|
||||
Reference in New Issue
Block a user