core: fix asynchronous file transfer (#572)

This commit is contained in:
JRoberts
2022-04-26 12:52:41 +04:00
committed by GitHub
parent f02dcc851e
commit 645587431d
3 changed files with 138 additions and 39 deletions

View File

@@ -879,7 +879,7 @@ subscribeUserConnections user@User {userId} = do
threadDelay 1000000
l <- asks chatLock
a <- asks smpAgent
unless (fileStatus == FSNew) . unlessM (isFileActive fileId sndFiles) $
when (fileStatus == FSConnected) . unlessM (isFileActive fileId sndFiles) $
withAgentLock a . withLock l $
sendFileChunk ft
subscribeRcvFile ft@RcvFileTransfer {fileStatus} =
@@ -1675,8 +1675,8 @@ cancelRcvFileTransfer ft@RcvFileTransfer {fileId, fileStatus} = do
updateRcvFileStatus st ft FSCancelled
deleteRcvFileChunks st ft
case fileStatus of
RFSAccepted RcvFileInfo {agentConnId = AgentConnId acId} -> withAgent (`suspendConnection` acId)
RFSConnected RcvFileInfo {agentConnId = AgentConnId acId} -> withAgent (`suspendConnection` acId)
RFSAccepted RcvFileInfo {agentConnId = AgentConnId acId} -> withAgent (`deleteConnection` acId)
RFSConnected RcvFileInfo {agentConnId = AgentConnId acId} -> withAgent (`deleteConnection` acId)
_ -> pure ()
cancelSndFileTransfer :: ChatMonad m => SndFileTransfer -> m ()
@@ -1687,7 +1687,7 @@ cancelSndFileTransfer ft@SndFileTransfer {agentConnId = AgentConnId acId, fileSt
deleteSndFileChunks st ft
withAgent $ \a -> do
void (sendMessage a acId $ smpEncode FileChunkCancel) `catchError` \_ -> pure ()
suspendConnection a acId
deleteConnection a acId
closeFileHandle :: ChatMonad m => Int64 -> (ChatController -> TVar (Map Int64 Handle)) -> m ()
closeFileHandle fileId files = do

View File

@@ -8,7 +8,7 @@
module ChatClient where
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, threadDelay)
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread)
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (bracket, bracket_)
@@ -81,16 +81,16 @@ cfg =
testView = True
}
createTestChat :: Int -> Profile -> IO TestCC
createTestChat dbNumber profile = do
let dbFilePrefix = testDBPrefix <> show dbNumber
createTestChat :: String -> Profile -> IO TestCC
createTestChat dbPrefix profile = do
let dbFilePrefix = testDBPrefix <> dbPrefix
st <- createStore (dbFilePrefix <> "_chat.db") 1 False
Right user <- runExceptT $ createUser st profile True
startTestChat_ st dbFilePrefix user
startTestChat :: Int -> IO TestCC
startTestChat dbNumber = do
let dbFilePrefix = testDBPrefix <> show dbNumber
startTestChat :: String -> IO TestCC
startTestChat dbPrefix = do
let dbFilePrefix = testDBPrefix <> dbPrefix
st <- createStore (dbFilePrefix <> "_chat.db") 1 False
Just user <- find activeUser <$> getUsers st
startTestChat_ st dbFilePrefix user
@@ -107,16 +107,15 @@ startTestChat_ st dbFilePrefix user = do
stopTestChat :: TestCC -> IO ()
stopTestChat TestCC {chatController = cc, chatAsync, termAsync} = do
threadDelay 500000
stopChatController cc
uninterruptibleCancel termAsync
uninterruptibleCancel chatAsync
withNewTestChat :: Int -> Profile -> (TestCC -> IO a) -> IO a
withNewTestChat dbNumber profile = bracket (createTestChat dbNumber profile) stopTestChat
withNewTestChat :: String -> Profile -> (TestCC -> IO a) -> IO a
withNewTestChat dbPrefix profile = bracket (createTestChat dbPrefix profile) (\cc -> cc <// 100000 >> stopTestChat cc)
withTestChat :: Int -> (TestCC -> IO a) -> IO a
withTestChat dbNumber = bracket (startTestChat dbNumber) stopTestChat
withTestChat :: String -> (TestCC -> IO a) -> IO a
withTestChat dbPrefix = bracket (startTestChat dbPrefix) (\cc -> cc <// 100000 >> stopTestChat cc)
readTerminalOutput :: VirtualTerminal -> TQueue String -> IO ()
readTerminalOutput t termQ = do
@@ -156,7 +155,7 @@ testChatN ps test = withTmpFiles $ do
where
getTestCCs :: [(Profile, Int)] -> [TestCC] -> IO [TestCC]
getTestCCs [] tcs = pure tcs
getTestCCs ((p, db) : envs') tcs = (:) <$> createTestChat db p <*> getTestCCs envs' tcs
getTestCCs ((p, db) : envs') tcs = (:) <$> createTestChat (show db) p <*> getTestCCs envs' tcs
(<//) :: TestCC -> Int -> Expectation
(<//) cc t = timeout t (getTermLine cc) `shouldReturn` Nothing
@@ -205,7 +204,7 @@ serverCfg =
{ transports = [(serverPort, transport @TLS)],
tbqSize = 1,
serverTbqSize = 1,
msgQueueQuota = 4,
msgQueueQuota = 16,
queueIdBytes = 12,
msgIdBytes = 6,
storeLogFile = Nothing,

View File

@@ -81,10 +81,13 @@ chatTests = do
it "delete connection requests when contact link deleted" testDeleteConnectionRequests
describe "SMP servers" $
it "get and set SMP servers" testGetSetSMPServers
describe "Async connection handshake" $ do
it "should connect when initiating client goes offline" testAsyncInitiatingOffline
it "should connect when accepting client goes offline" testAsyncAcceptingOffline
it "should connect when clients are never simultaneously online" testAsyncNeverTogetherOnline
describe "async connection handshake" $ do
it "connect when initiating client goes offline" testAsyncInitiatingOffline
it "connect when accepting client goes offline" testAsyncAcceptingOffline
it "connect, fully asynchronous (when clients are never simultaneously online)" testFullAsync
xdescribe "async sending and receiving files" $ do
it "send and receive file, fully asynchronous" testAsyncFileTransfer
it "send and receive file to group, fully asynchronous" testAsyncGroupFileTransfer
testAddContact :: IO ()
testAddContact =
@@ -1748,51 +1751,148 @@ testGetSetSMPServers =
testAsyncInitiatingOffline :: IO ()
testAsyncInitiatingOffline = withTmpFiles $ do
inv <- withNewTestChat 1 aliceProfile $ \alice -> do
inv <- withNewTestChat "alice" aliceProfile $ \alice -> do
alice ##> "/c"
getInvitation alice
withNewTestChat 2 bobProfile $ \bob -> do
withNewTestChat "bob" bobProfile $ \bob -> do
bob ##> ("/c " <> inv)
bob <## "confirmation sent!"
withTestChat 1 $ \alice -> do
withTestChat "alice" $ \alice -> do
concurrently_
(bob <## "alice (Alice): contact is connected")
(alice <## "bob (Bob): contact is connected")
testAsyncAcceptingOffline :: IO ()
testAsyncAcceptingOffline = withTmpFiles $ do
inv <- withNewTestChat 1 aliceProfile $ \alice -> do
inv <- withNewTestChat "alice" aliceProfile $ \alice -> do
alice ##> "/c"
getInvitation alice
withNewTestChat 2 bobProfile $ \bob -> do
withNewTestChat "bob" bobProfile $ \bob -> do
bob ##> ("/c " <> inv)
bob <## "confirmation sent!"
withTestChat 1 $ \alice ->
withTestChat 2 $ \bob ->
withTestChat "alice" $ \alice ->
withTestChat "bob" $ \bob ->
concurrently_
(bob <## "alice (Alice): contact is connected")
(alice <## "bob (Bob): contact is connected")
testAsyncNeverTogetherOnline :: IO ()
testAsyncNeverTogetherOnline = withTmpFiles $ do
inv <- withNewTestChat 1 aliceProfile $ \alice -> do
testFullAsync :: IO ()
testFullAsync = withTmpFiles $ do
inv <- withNewTestChat "alice" aliceProfile $ \alice -> do
alice ##> "/c"
getInvitation alice
withNewTestChat 2 bobProfile $ \bob -> do
withNewTestChat "bob" bobProfile $ \bob -> do
bob ##> ("/c " <> inv)
bob <## "confirmation sent!"
withTestChat 1 $ \_ -> pure ()
withTestChat 2 $ \_ -> pure ()
withTestChat 1 $ \alice ->
withTestChat "alice" $ \_ -> pure ()
withTestChat "bob" $ \_ -> pure ()
withTestChat "alice" $ \alice ->
alice <## "1 contacts connected (use /cs for the list)"
withTestChat 2 $ \_ -> pure ()
withTestChat 1 $ \alice -> do
withTestChat "bob" $ \_ -> pure ()
withTestChat "alice" $ \alice -> do
alice <## "1 contacts connected (use /cs for the list)"
alice <## "bob (Bob): contact is connected"
withTestChat 2 $ \bob -> do
withTestChat "bob" $ \bob -> do
bob <## "1 contacts connected (use /cs for the list)"
bob <## "alice (Alice): contact is connected"
testAsyncFileTransfer :: IO ()
testAsyncFileTransfer = withTmpFiles $ do
withNewTestChat "alice" aliceProfile $ \alice ->
withNewTestChat "bob" bobProfile $ \bob ->
connectUsers alice bob
withTestChatContactConnected "alice" $ \alice -> do
alice ##> "/_send @2 file ./tests/fixtures/test.jpg text hi, sending a file"
alice <# "@bob hi, sending a file"
alice <# "/f @bob ./tests/fixtures/test.jpg"
alice <## "use /fc 1 to cancel sending"
withTestChatContactConnected "bob" $ \bob -> do
bob <# "alice> hi, sending a file"
bob <# "alice> sends file test.jpg (136.5 KiB / 139737 bytes)"
bob <## "use /fr 1 [<dir>/ | <path>] to receive it"
bob ##> "/fr 1 ./tests/tmp"
bob <## "saving file 1 from alice to ./tests/tmp/test.jpg"
withTestChatContactConnected' "alice"
withTestChatContactConnected' "bob"
withTestChatContactConnected' "alice"
withTestChatContactConnected' "bob"
withTestChatContactConnected "alice" $ \alice -> do
alice <## "started sending file 1 (test.jpg) to bob"
alice <## "completed sending file 1 (test.jpg) to bob"
withTestChatContactConnected "bob" $ \bob -> do
bob <## "started receiving file 1 (test.jpg) from alice"
bob <## "completed receiving file 1 (test.jpg) from alice"
src <- B.readFile "./tests/fixtures/test.jpg"
dest <- B.readFile "./tests/tmp/test.jpg"
dest `shouldBe` src
testAsyncGroupFileTransfer :: IO ()
testAsyncGroupFileTransfer = withTmpFiles $ do
withNewTestChat "alice" aliceProfile $ \alice ->
withNewTestChat "bob" bobProfile $ \bob ->
withNewTestChat "cath" cathProfile $ \cath ->
createGroup3 "team" alice bob cath
withTestChatGroup3Connected "alice" $ \alice -> do
alice ##> "/_send #1 file ./tests/fixtures/test.jpg json {\"text\":\"\",\"type\":\"text\"}"
alice <# "/f #team ./tests/fixtures/test.jpg"
alice <## "use /fc 1 to cancel sending"
withTestChatGroup3Connected "bob" $ \bob -> do
bob <# "#team alice> sends file test.jpg (136.5 KiB / 139737 bytes)"
bob <## "use /fr 1 [<dir>/ | <path>] to receive it"
bob ##> "/fr 1 ./tests/tmp/"
bob <## "saving file 1 from alice to ./tests/tmp/test.jpg"
withTestChatGroup3Connected "cath" $ \cath -> do
cath <# "#team alice> sends file test.jpg (136.5 KiB / 139737 bytes)"
cath <## "use /fr 1 [<dir>/ | <path>] to receive it"
cath ##> "/fr 1 ./tests/tmp/"
cath <## "saving file 1 from alice to ./tests/tmp/test_1.jpg"
withTestChatGroup3Connected' "alice"
withTestChatGroup3Connected' "bob"
withTestChatGroup3Connected' "cath"
withTestChatGroup3Connected' "alice"
withTestChatGroup3Connected' "bob"
withTestChatGroup3Connected' "cath"
withTestChatGroup3Connected' "alice"
withTestChatGroup3Connected "bob" $ \bob -> do
bob <## "started receiving file 1 (test.jpg) from alice"
withTestChatGroup3Connected "cath" $ \cath -> do
cath <## "started receiving file 1 (test.jpg) from alice"
withTestChatGroup3Connected "alice" $ \alice -> do
alice
<### [ "started sending file 1 (test.jpg) to bob",
"completed sending file 1 (test.jpg) to bob",
"started sending file 1 (test.jpg) to cath",
"completed sending file 1 (test.jpg) to cath"
]
withTestChatGroup3Connected "bob" $ \bob -> do
bob <## "completed receiving file 1 (test.jpg) from alice"
withTestChatGroup3Connected "cath" $ \cath -> do
cath <## "completed receiving file 1 (test.jpg) from alice"
src <- B.readFile "./tests/fixtures/test.jpg"
dest <- B.readFile "./tests/tmp/test.jpg"
dest `shouldBe` src
dest2 <- B.readFile "./tests/tmp/test_1.jpg"
dest2 `shouldBe` src
withTestChatContactConnected :: String -> (TestCC -> IO a) -> IO a
withTestChatContactConnected dbPrefix action =
withTestChat dbPrefix $ \cc -> do
cc <## "1 contacts connected (use /cs for the list)"
action cc
withTestChatContactConnected' :: String -> IO ()
withTestChatContactConnected' dbPrefix = withTestChatContactConnected dbPrefix $ \_ -> pure ()
withTestChatGroup3Connected :: String -> (TestCC -> IO a) -> IO a
withTestChatGroup3Connected dbPrefix action = do
withTestChat dbPrefix $ \cc -> do
cc <## "2 contacts connected (use /cs for the list)"
cc <## "#team: connected to server(s)"
action cc
withTestChatGroup3Connected' :: String -> IO ()
withTestChatGroup3Connected' dbPrefix = withTestChatGroup3Connected dbPrefix $ \_ -> pure ()
startFileTransfer :: TestCC -> TestCC -> IO ()
startFileTransfer alice bob = do
alice #> "/f @bob ./tests/fixtures/test.jpg"