mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-06-08 13:02:23 +00:00
return updated ConnectionStats from switchConnectionAsync (#777)
This commit is contained in:
@@ -205,7 +205,7 @@ ackMessageAsync :: forall m. AgentErrorMonad m => AgentClient -> ACorrId -> Conn
|
||||
ackMessageAsync c = withAgentEnv c .:. ackMessageAsync' c
|
||||
|
||||
-- | Switch connection to the new receive queue
|
||||
switchConnectionAsync :: AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> m ()
|
||||
switchConnectionAsync :: AgentErrorMonad m => AgentClient -> ACorrId -> ConnId -> m ConnectionStats
|
||||
switchConnectionAsync c = withAgentEnv c .: switchConnectionAsync' c
|
||||
|
||||
-- | Delete SMP agent connection (DEL command) asynchronously, no synchronous response
|
||||
@@ -529,15 +529,17 @@ deleteConnectionsAsync_ onSuccess c connIds = case connIds of
|
||||
deleteConnQueues c True rqs >> onSuccess
|
||||
|
||||
-- | Add connection to the new receive queue
|
||||
switchConnectionAsync' :: AgentMonad m => AgentClient -> ACorrId -> ConnId -> m ()
|
||||
switchConnectionAsync' :: AgentMonad m => AgentClient -> ACorrId -> ConnId -> m ConnectionStats
|
||||
switchConnectionAsync' c corrId connId =
|
||||
withConnLock c connId "switchConnectionAsync" $
|
||||
withStore c (`getConn` connId) >>= \case
|
||||
SomeConn _ (DuplexConnection _ rqs@(rq :| _rqs) _)
|
||||
SomeConn _ (DuplexConnection cData rqs@(rq :| _rqs) sqs)
|
||||
| isJust (switchingRQ rqs) -> throwError $ CMD PROHIBITED
|
||||
| otherwise -> do
|
||||
void $ withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSwitchStarted
|
||||
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSwitchStarted
|
||||
enqueueCommand c corrId connId Nothing $ AClientCommand $ APC SAEConn SWCH
|
||||
let rqs' = updatedQs rq1 rqs
|
||||
pure . connectionStats $ DuplexConnection cData rqs' sqs
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
|
||||
newConn :: AgentMonad m => AgentClient -> UserId -> ConnId -> Bool -> SConnectionMode c -> Maybe CRClientData -> m (ConnId, ConnectionRequestUri c)
|
||||
|
||||
@@ -988,7 +988,8 @@ testSwitchConnection servers = do
|
||||
|
||||
testFullSwitch :: AgentClient -> ByteString -> AgentClient -> ByteString -> Int64 -> ExceptT AgentErrorType IO ()
|
||||
testFullSwitch a bId b aId msgId = do
|
||||
switchConnectionAsync a "" bId
|
||||
stats <- switchConnectionAsync a "" bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Just RSSwitchStarted]
|
||||
switchComplete a bId b aId
|
||||
exchangeGreetingsMsgId msgId a bId b aId
|
||||
|
||||
@@ -1039,7 +1040,8 @@ testSwitchAsync servers = do
|
||||
let withA' = sessionSubscribe withA [bId]
|
||||
withB' = sessionSubscribe withB [aId]
|
||||
withA' $ \a -> do
|
||||
switchConnectionAsync a "" bId
|
||||
stats <- switchConnectionAsync a "" bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
|
||||
withB' $ \b -> do
|
||||
phaseSnd b aId SPStarted [Just SSSendingQKEY, Nothing]
|
||||
@@ -1082,7 +1084,8 @@ testSwitchDelete servers = do
|
||||
(aId, bId) <- makeConnection a b
|
||||
exchangeGreetingsMsgId 4 a bId b aId
|
||||
disconnectAgentClient b
|
||||
switchConnectionAsync a "" bId
|
||||
stats <- switchConnectionAsync a "" bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
|
||||
deleteConnectionAsync a bId
|
||||
get a =##> \case ("", c, DEL_RCVQ _ _ Nothing) -> c == bId; _ -> False
|
||||
@@ -1099,20 +1102,22 @@ testAbortSwitchStarted servers = do
|
||||
let withA' = sessionSubscribe withA [bId]
|
||||
withB' = sessionSubscribe withB [aId]
|
||||
withA' $ \a -> do
|
||||
switchConnectionAsync a "" bId
|
||||
stats <- switchConnectionAsync a "" bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
|
||||
-- repeat switch is prohibited
|
||||
Left Agent.CMD {cmdErr = PROHIBITED} <- runExceptT $ switchConnectionAsync a "" bId
|
||||
-- abort current switch
|
||||
stats <- abortConnectionSwitch a bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Nothing]
|
||||
stats' <- abortConnectionSwitch a bId
|
||||
liftIO $ rcvSwchStatuses' stats' `shouldMatchList` [Nothing]
|
||||
withB' $ \b -> do
|
||||
phaseSnd b aId SPStarted [Just SSSendingQKEY, Nothing]
|
||||
phaseSnd b aId SPConfirmed [Just SSSendingQKEY, Nothing]
|
||||
withA' $ \a -> do
|
||||
get a ##> ("", bId, ERR (AGENT {agentErr = A_QUEUE {queueErr = "QKEY: queue address not found in connection"}}))
|
||||
-- repeat switch
|
||||
switchConnectionAsync a "" bId
|
||||
stats <- switchConnectionAsync a "" bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
|
||||
withA $ \a -> withB $ \b -> runRight_ $ do
|
||||
subscribeConnection a bId
|
||||
@@ -1147,13 +1152,15 @@ testAbortSwitchStartedReinitiate servers = do
|
||||
let withA' = sessionSubscribe withA [bId]
|
||||
withB' = sessionSubscribe withB [aId]
|
||||
withA' $ \a -> do
|
||||
switchConnectionAsync a "" bId
|
||||
stats <- switchConnectionAsync a "" bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
|
||||
-- abort current switch
|
||||
stats <- abortConnectionSwitch a bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Nothing]
|
||||
stats' <- abortConnectionSwitch a bId
|
||||
liftIO $ rcvSwchStatuses' stats' `shouldMatchList` [Nothing]
|
||||
-- repeat switch
|
||||
switchConnectionAsync a "" bId
|
||||
stats'' <- switchConnectionAsync a "" bId
|
||||
liftIO $ rcvSwchStatuses' stats'' `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
|
||||
withB' $ \b -> do
|
||||
phaseSnd b aId SPStarted [Just SSSendingQKEY, Nothing]
|
||||
@@ -1212,7 +1219,8 @@ testCannotAbortSwitchSecured servers = do
|
||||
let withA' = sessionSubscribe withA [bId]
|
||||
withB' = sessionSubscribe withB [aId]
|
||||
withA' $ \a -> do
|
||||
switchConnectionAsync a "" bId
|
||||
stats <- switchConnectionAsync a "" bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
|
||||
withB' $ \b -> do
|
||||
phaseSnd b aId SPStarted [Just SSSendingQKEY, Nothing]
|
||||
@@ -1250,9 +1258,11 @@ testSwitch2Connections servers = do
|
||||
pure (aId1, bId1, aId2, bId2)
|
||||
withA $ \a -> runRight_ $ do
|
||||
void $ subscribeConnections a [bId1, bId2]
|
||||
switchConnectionAsync a "" bId1
|
||||
stats1 <- switchConnectionAsync a "" bId1
|
||||
liftIO $ rcvSwchStatuses' stats1 `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId1 SPStarted [Just RSSendingQADD, Nothing]
|
||||
switchConnectionAsync a "" bId2
|
||||
stats2 <- switchConnectionAsync a "" bId2
|
||||
liftIO $ rcvSwchStatuses' stats2 `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId2 SPStarted [Just RSSendingQADD, Nothing]
|
||||
withA $ \a -> withB $ \b -> runRight_ $ do
|
||||
void $ subscribeConnections a [bId1, bId2]
|
||||
@@ -1306,13 +1316,15 @@ testSwitch2ConnectionsAbort1 servers = do
|
||||
let withA' = sessionSubscribe withA [bId1, bId2]
|
||||
withB' = sessionSubscribe withB [aId1, aId2]
|
||||
withA' $ \a -> do
|
||||
switchConnectionAsync a "" bId1
|
||||
stats1 <- switchConnectionAsync a "" bId1
|
||||
liftIO $ rcvSwchStatuses' stats1 `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId1 SPStarted [Just RSSendingQADD, Nothing]
|
||||
switchConnectionAsync a "" bId2
|
||||
stats2 <- switchConnectionAsync a "" bId2
|
||||
liftIO $ rcvSwchStatuses' stats2 `shouldMatchList` [Just RSSwitchStarted]
|
||||
phaseRcv a bId2 SPStarted [Just RSSendingQADD, Nothing]
|
||||
-- abort switch of second connection
|
||||
stats <- abortConnectionSwitch a bId2
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Nothing]
|
||||
stats2' <- abortConnectionSwitch a bId2
|
||||
liftIO $ rcvSwchStatuses' stats2' `shouldMatchList` [Nothing]
|
||||
withB' $ \b -> do
|
||||
liftIO . getInAnyOrder b $
|
||||
[ switchPhaseSndP aId1 SPStarted [Just SSSendingQKEY, Nothing],
|
||||
|
||||
Reference in New Issue
Block a user