mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 22:55:50 +00:00
rename "stop switch" -> "abort switch", add canAbortSwitch to rcv queue info (#775)
This commit is contained in:
@@ -63,7 +63,7 @@ module Simplex.Messaging.Agent
|
||||
sendMessage,
|
||||
ackMessage,
|
||||
switchConnection,
|
||||
stopConnectionSwitch,
|
||||
abortConnectionSwitch,
|
||||
suspendConnection,
|
||||
deleteConnection,
|
||||
deleteConnections,
|
||||
@@ -269,9 +269,9 @@ ackMessage c = withAgentEnv c .: ackMessage' c
|
||||
switchConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m ConnectionStats
|
||||
switchConnection c = withAgentEnv c . switchConnection' c
|
||||
|
||||
-- | Stop switching connection to the new receive queue
|
||||
stopConnectionSwitch :: AgentErrorMonad m => AgentClient -> ConnId -> m ConnectionStats
|
||||
stopConnectionSwitch c = withAgentEnv c . stopConnectionSwitch' c
|
||||
-- | Abort switching connection to the new receive queue
|
||||
abortConnectionSwitch :: AgentErrorMonad m => AgentClient -> ConnId -> m ConnectionStats
|
||||
abortConnectionSwitch c = withAgentEnv c . abortConnectionSwitch' c
|
||||
|
||||
-- | Suspend SMP agent connection (OFF command)
|
||||
suspendConnection :: AgentErrorMonad m => AgentClient -> ConnId -> m ()
|
||||
@@ -1240,13 +1240,13 @@ switchDuplexConnection c (DuplexConnection cData@ConnData {connId, userId} rqs s
|
||||
let rqs' = updatedQs rq1 rqs <> [rq']
|
||||
pure . connectionStats $ DuplexConnection cData rqs' sqs
|
||||
|
||||
stopConnectionSwitch' :: AgentMonad m => AgentClient -> ConnId -> m ConnectionStats
|
||||
stopConnectionSwitch' c connId =
|
||||
withConnLock c connId "stopConnectionSwitch" $
|
||||
abortConnectionSwitch' :: AgentMonad m => AgentClient -> ConnId -> m ConnectionStats
|
||||
abortConnectionSwitch' c connId =
|
||||
withConnLock c connId "abortConnectionSwitch" $
|
||||
withStore c (`getConn` connId) >>= \case
|
||||
SomeConn _ (DuplexConnection cData rqs sqs) -> case switchingRQ rqs of
|
||||
Just rq
|
||||
| canStopRcvSwitch rq -> do
|
||||
| canAbortRcvSwitch rq -> do
|
||||
-- multiple queues to which the connections switches were possible when repeating switch was allowed
|
||||
let (delRqs, keepRqs) = L.partition ((Just (dbQId rq) ==) . dbReplaceQId) rqs
|
||||
case L.nonEmpty keepRqs of
|
||||
@@ -1263,18 +1263,6 @@ stopConnectionSwitch' c connId =
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
_ -> throwError $ CMD PROHIBITED
|
||||
|
||||
canStopRcvSwitch :: RcvQueue -> Bool
|
||||
canStopRcvSwitch = maybe False canStop . rcvSwchStatus
|
||||
where
|
||||
canStop = \case
|
||||
RSSwitchStarted -> True
|
||||
RSSendingQADD -> True
|
||||
-- if switch is in RSSendingQUSE, a race condition with sender deleting the original queue is possible
|
||||
RSSendingQUSE -> False
|
||||
-- if switch is in RSReceivedMessage status, stopping switch (deleting new queue)
|
||||
-- will break the connection because the sender would have original queue deleted
|
||||
RSReceivedMessage -> False
|
||||
|
||||
ackQueueMessage :: AgentMonad m => AgentClient -> RcvQueue -> SMP.MsgId -> m ()
|
||||
ackQueueMessage c rq srvMsgId =
|
||||
sendAck c rq srvMsgId `catchError` \case
|
||||
|
||||
@@ -561,19 +561,23 @@ instance FromJSON SndSwitchStatus where
|
||||
|
||||
data RcvQueueInfo = RcvQueueInfo
|
||||
{ rcvServer :: SMPServer,
|
||||
rcvSwitchStatus :: Maybe RcvSwitchStatus
|
||||
rcvSwitchStatus :: Maybe RcvSwitchStatus,
|
||||
canAbortSwitch :: Bool
|
||||
}
|
||||
deriving (Eq, Show, Generic)
|
||||
|
||||
instance ToJSON RcvQueueInfo where toEncoding = J.genericToEncoding J.defaultOptions {J.omitNothingFields = True}
|
||||
|
||||
instance StrEncoding RcvQueueInfo where
|
||||
strEncode RcvQueueInfo {rcvServer, rcvSwitchStatus} =
|
||||
"srv=" <> strEncode rcvServer <> maybe "" (\switch -> ";switch=" <> strEncode switch) rcvSwitchStatus
|
||||
strEncode RcvQueueInfo {rcvServer, rcvSwitchStatus, canAbortSwitch} =
|
||||
"srv=" <> strEncode rcvServer
|
||||
<> maybe "" (\switch -> ";switch=" <> strEncode switch) rcvSwitchStatus
|
||||
<> (";can_abort_switch=" <> strEncode canAbortSwitch)
|
||||
strP = do
|
||||
rcvServer <- "srv=" *> strP
|
||||
rcvSwitchStatus <- optional $ ";switch=" *> strP
|
||||
pure RcvQueueInfo {rcvServer, rcvSwitchStatus}
|
||||
canAbortSwitch <- ";can_abort_switch=" *> strP
|
||||
pure RcvQueueInfo {rcvServer, rcvSwitchStatus, canAbortSwitch}
|
||||
|
||||
data SndQueueInfo = SndQueueInfo
|
||||
{ sndServer :: SMPServer,
|
||||
|
||||
@@ -83,8 +83,20 @@ data RcvQueue = RcvQueue
|
||||
deriving (Eq, Show)
|
||||
|
||||
rcvQueueInfo :: RcvQueue -> RcvQueueInfo
|
||||
rcvQueueInfo RcvQueue {server, rcvSwchStatus} =
|
||||
RcvQueueInfo {rcvServer = server, rcvSwitchStatus = rcvSwchStatus}
|
||||
rcvQueueInfo rq@RcvQueue {server, rcvSwchStatus} =
|
||||
RcvQueueInfo {rcvServer = server, rcvSwitchStatus = rcvSwchStatus, canAbortSwitch = canAbortRcvSwitch rq}
|
||||
|
||||
canAbortRcvSwitch :: RcvQueue -> Bool
|
||||
canAbortRcvSwitch = maybe False canAbort . rcvSwchStatus
|
||||
where
|
||||
canAbort = \case
|
||||
RSSwitchStarted -> True
|
||||
RSSendingQADD -> True
|
||||
-- if switch is in RSSendingQUSE, a race condition with sender deleting the original queue is possible
|
||||
RSSendingQUSE -> False
|
||||
-- if switch is in RSReceivedMessage status, aborting switch (deleting new queue)
|
||||
-- will break the connection because the sender would have original queue deleted
|
||||
RSReceivedMessage -> False
|
||||
|
||||
data ClientNtfCreds = ClientNtfCreds
|
||||
{ -- | key pair to be used by the notification server to sign transmissions
|
||||
|
||||
@@ -210,16 +210,16 @@ functionalAPITests t = do
|
||||
testServerMatrix2 t testSwitchAsync
|
||||
describe "should delete connection during switch" $
|
||||
testServerMatrix2 t testSwitchDelete
|
||||
describe "should stop switch in Started phase" $
|
||||
testServerMatrix2 t testStopSwitchStarted
|
||||
describe "should stop switch in Started phase, reinitiate immediately" $
|
||||
testServerMatrix2 t testStopSwitchStartedReinitiate
|
||||
describe "should prohibit to stop switch in Secured phase" $
|
||||
testServerMatrix2 t testCannotStopSwitchSecured
|
||||
describe "should abort switch in Started phase" $
|
||||
testServerMatrix2 t testAbortSwitchStarted
|
||||
describe "should abort switch in Started phase, reinitiate immediately" $
|
||||
testServerMatrix2 t testAbortSwitchStartedReinitiate
|
||||
describe "should prohibit to abort switch in Secured phase" $
|
||||
testServerMatrix2 t testCannotAbortSwitchSecured
|
||||
describe "should switch two connections simultaneously" $
|
||||
testServerMatrix2 t testSwitch2Connections
|
||||
describe "should switch two connections simultaneously, stop one" $
|
||||
testServerMatrix2 t testSwitch2ConnectionsStop1
|
||||
describe "should switch two connections simultaneously, abort one" $
|
||||
testServerMatrix2 t testSwitch2ConnectionsAbort1
|
||||
describe "SMP basic auth" $ do
|
||||
describe "with server auth" $ do
|
||||
-- allow NEW | server auth, v | clnt1 auth, v | clnt2 auth, v | 2 - success, 1 - JOIN fail, 0 - NEW fail
|
||||
@@ -1090,8 +1090,8 @@ testSwitchDelete servers = do
|
||||
get a =##> \case ("", c, DEL_CONN) -> c == bId; _ -> False
|
||||
liftIO $ noMessages a "nothing else should be delivered to alice"
|
||||
|
||||
testStopSwitchStarted :: HasCallStack => InitialAgentServers -> IO ()
|
||||
testStopSwitchStarted servers = do
|
||||
testAbortSwitchStarted :: HasCallStack => InitialAgentServers -> IO ()
|
||||
testAbortSwitchStarted servers = do
|
||||
(aId, bId) <- withA $ \a -> withB $ \b -> runRight $ do
|
||||
(aId, bId) <- makeConnection a b
|
||||
exchangeGreetingsMsgId 4 a bId b aId
|
||||
@@ -1103,8 +1103,8 @@ testStopSwitchStarted servers = do
|
||||
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
|
||||
-- repeat switch is prohibited
|
||||
Left Agent.CMD {cmdErr = PROHIBITED} <- runExceptT $ switchConnectionAsync a "" bId
|
||||
-- stop current switch
|
||||
stats <- stopConnectionSwitch a bId
|
||||
-- abort current switch
|
||||
stats <- abortConnectionSwitch a bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Nothing]
|
||||
withB' $ \b -> do
|
||||
phaseSnd b aId SPStarted [Just SSSendingQKEY, Nothing]
|
||||
@@ -1138,8 +1138,8 @@ testStopSwitchStarted servers = do
|
||||
withB :: (AgentClient -> IO a) -> IO a
|
||||
withB = withAgent agentCfg {initialClientId = 1} servers testDB2
|
||||
|
||||
testStopSwitchStartedReinitiate :: HasCallStack => InitialAgentServers -> IO ()
|
||||
testStopSwitchStartedReinitiate servers = do
|
||||
testAbortSwitchStartedReinitiate :: HasCallStack => InitialAgentServers -> IO ()
|
||||
testAbortSwitchStartedReinitiate servers = do
|
||||
(aId, bId) <- withA $ \a -> withB $ \b -> runRight $ do
|
||||
(aId, bId) <- makeConnection a b
|
||||
exchangeGreetingsMsgId 4 a bId b aId
|
||||
@@ -1149,8 +1149,8 @@ testStopSwitchStartedReinitiate servers = do
|
||||
withA' $ \a -> do
|
||||
switchConnectionAsync a "" bId
|
||||
phaseRcv a bId SPStarted [Just RSSendingQADD, Nothing]
|
||||
-- stop current switch
|
||||
stats <- stopConnectionSwitch a bId
|
||||
-- abort current switch
|
||||
stats <- abortConnectionSwitch a bId
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Nothing]
|
||||
-- repeat switch
|
||||
switchConnectionAsync a "" bId
|
||||
@@ -1203,8 +1203,8 @@ errQueueNotFoundP cId = \case
|
||||
(_, cId', ERR AGENT {agentErr = A_QUEUE {queueErr = "QKEY: queue address not found in connection"}}) -> cId' == cId
|
||||
_ -> False
|
||||
|
||||
testCannotStopSwitchSecured :: HasCallStack => InitialAgentServers -> IO ()
|
||||
testCannotStopSwitchSecured servers = do
|
||||
testCannotAbortSwitchSecured :: HasCallStack => InitialAgentServers -> IO ()
|
||||
testCannotAbortSwitchSecured servers = do
|
||||
(aId, bId) <- withA $ \a -> withB $ \b -> runRight $ do
|
||||
(aId, bId) <- makeConnection a b
|
||||
exchangeGreetingsMsgId 4 a bId b aId
|
||||
@@ -1220,7 +1220,7 @@ testCannotStopSwitchSecured servers = do
|
||||
withA' $ \a -> do
|
||||
phaseRcv a bId SPConfirmed [Just RSSendingQADD, Nothing]
|
||||
phaseRcv a bId SPSecured [Just RSSendingQUSE, Nothing]
|
||||
Left Agent.CMD {cmdErr = PROHIBITED} <- runExceptT $ stopConnectionSwitch a bId
|
||||
Left Agent.CMD {cmdErr = PROHIBITED} <- runExceptT $ abortConnectionSwitch a bId
|
||||
pure ()
|
||||
withA $ \a -> withB $ \b -> runRight_ $ do
|
||||
subscribeConnection a bId
|
||||
@@ -1295,8 +1295,8 @@ testSwitch2Connections servers = do
|
||||
withB :: (AgentClient -> IO a) -> IO a
|
||||
withB = withAgent agentCfg {initialClientId = 1} servers testDB2
|
||||
|
||||
testSwitch2ConnectionsStop1 :: HasCallStack => InitialAgentServers -> IO ()
|
||||
testSwitch2ConnectionsStop1 servers = do
|
||||
testSwitch2ConnectionsAbort1 :: HasCallStack => InitialAgentServers -> IO ()
|
||||
testSwitch2ConnectionsAbort1 servers = do
|
||||
(aId1, bId1, aId2, bId2) <- withA $ \a -> withB $ \b -> runRight $ do
|
||||
(aId1, bId1) <- makeConnection a b
|
||||
exchangeGreetingsMsgId 4 a bId1 b aId1
|
||||
@@ -1310,8 +1310,8 @@ testSwitch2ConnectionsStop1 servers = do
|
||||
phaseRcv a bId1 SPStarted [Just RSSendingQADD, Nothing]
|
||||
switchConnectionAsync a "" bId2
|
||||
phaseRcv a bId2 SPStarted [Just RSSendingQADD, Nothing]
|
||||
-- stop switch of second connection
|
||||
stats <- stopConnectionSwitch a bId2
|
||||
-- abort switch of second connection
|
||||
stats <- abortConnectionSwitch a bId2
|
||||
liftIO $ rcvSwchStatuses' stats `shouldMatchList` [Nothing]
|
||||
withB' $ \b -> do
|
||||
liftIO . getInAnyOrder b $
|
||||
|
||||
Reference in New Issue
Block a user