mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 18:34:37 +00:00
send LEN to ACK/SUB when suspended queue has no messages
This commit is contained in:
@@ -375,8 +375,11 @@ subscribeSMPQueue :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT Pr
|
||||
subscribeSMPQueue c rpKey rId =
|
||||
sendSMPCommand c (Just rpKey) rId SUB >>= \case
|
||||
OK -> return ()
|
||||
cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd
|
||||
cmd@MSG {} -> deliver cmd
|
||||
cmd@LEN {} -> deliver cmd
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
where
|
||||
deliver = liftIO . writeSMPMessage c rId
|
||||
|
||||
-- | Subscribe to multiple SMP queues batching commands if supported.
|
||||
subscribeSMPQueues :: SMPClient -> NonEmpty (RcvPrivateSignKey, RecipientId) -> IO (NonEmpty (Either ProtocolClientError ()))
|
||||
@@ -385,9 +388,12 @@ subscribeSMPQueues c qs = sendProtocolCommands c cs >>= mapM response . L.zip qs
|
||||
cs = L.map (\(rpKey, rId) -> (Just rpKey, rId, Cmd SRecipient SUB)) qs
|
||||
response ((_, rId), r) = case r of
|
||||
Right OK -> pure $ Right ()
|
||||
Right cmd@MSG {} -> writeSMPMessage c rId cmd $> Right ()
|
||||
Right cmd@MSG {} -> deliver cmd
|
||||
Right cmd@LEN {} -> deliver cmd
|
||||
Right r' -> pure . Left . PCEUnexpectedResponse $ bshow r'
|
||||
Left e -> pure $ Left e
|
||||
where
|
||||
deliver cmd = writeSMPMessage c rId cmd $> Right ()
|
||||
|
||||
writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> IO ()
|
||||
writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c rId msg) (msgQ c)
|
||||
@@ -403,8 +409,11 @@ getSMPMessage :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT Protoc
|
||||
getSMPMessage c rpKey rId =
|
||||
sendSMPCommand c (Just rpKey) rId GET >>= \case
|
||||
OK -> pure Nothing
|
||||
cmd@(MSG msg) -> liftIO (writeSMPMessage c rId cmd) $> Just msg
|
||||
cmd@(MSG msg) -> deliver cmd $> Just msg
|
||||
cmd@LEN {} -> deliver cmd $> Nothing
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
where
|
||||
deliver = liftIO . writeSMPMessage c rId
|
||||
|
||||
-- | Subscribe to the SMP queue notifications.
|
||||
--
|
||||
@@ -469,8 +478,11 @@ ackSMPMessage :: SMPClient -> RcvPrivateSignKey -> QueueId -> MsgId -> ExceptT P
|
||||
ackSMPMessage c rpKey rId msgId =
|
||||
sendSMPCommand c (Just rpKey) rId (ACK msgId) >>= \case
|
||||
OK -> return ()
|
||||
cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd
|
||||
cmd@MSG {} -> deliver cmd
|
||||
cmd@LEN {} -> deliver cmd
|
||||
r -> throwE . PCEUnexpectedResponse $ bshow r
|
||||
where
|
||||
deliver = liftIO . writeSMPMessage c rId
|
||||
|
||||
-- | Irreversibly suspend SMP queue.
|
||||
-- The existing messages from the queue will still be delivered.
|
||||
|
||||
@@ -590,7 +590,9 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
|
||||
Just msg ->
|
||||
let encMsg = encryptMsg qr msg
|
||||
in atomically (setDelivered s msg) $> (corrId, rId, MSG encMsg)
|
||||
_ -> forkSub $> ok
|
||||
_
|
||||
| status qr == QueueActive -> forkSub $> ok
|
||||
| otherwise -> pure (corrId, rId, LEN 0)
|
||||
_ -> pure ok
|
||||
where
|
||||
forkSub :: m ()
|
||||
|
||||
@@ -189,8 +189,8 @@ runAgentClientTest alice bob baseId = do
|
||||
get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False
|
||||
ackMessage alice bobId $ baseId + 3
|
||||
get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False
|
||||
ackMessage alice bobId $ baseId + 4
|
||||
_ <- suspendConnection alice bobId
|
||||
ackMessage alice bobId $ baseId + 4
|
||||
5 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 2"
|
||||
get bob ##> ("", aliceId, MERR (baseId + 5) (SMP AUTH))
|
||||
deleteConnection alice bobId
|
||||
|
||||
@@ -265,7 +265,7 @@ testCreateDelete (ATransport t) =
|
||||
Resp "bcda" _ err2 <- signSendRecv rh rKey ("bcda", sId, OFF)
|
||||
(err2, ERR AUTH) #== "rejects OFF with sender's ID"
|
||||
|
||||
Resp "cdab" rId2 (LEN _) <- signSendRecv rh rKey ("cdab", rId, OFF)
|
||||
Resp "cdab" rId2 (LEN 2) <- signSendRecv rh rKey ("cdab", rId, OFF)
|
||||
(rId2, rId) #== "same queue ID in response 2"
|
||||
|
||||
Resp "dabc" _ err3 <- signSendRecv sh sKey ("dabc", sId, _SEND "hello")
|
||||
@@ -279,6 +279,11 @@ testCreateDelete (ATransport t) =
|
||||
Resp "cdab" _ (Msg mId2 msg2) <- signSendRecv rh rKey ("cdab", rId, SUB)
|
||||
(dec mId2 msg2, Right "hello") #== "accepts SUB when suspended and delivers the message again (because was not ACKed)"
|
||||
|
||||
Resp "abcd" _ (Msg mId3 msg3) <- signSendRecv rh rKey ("abcd", rId, ACK mId2)
|
||||
(dec mId3 msg3, Right "hello 2") #== "deliver the next message on ACK"
|
||||
|
||||
Resp "bcda" _ (LEN 0) <- signSendRecv rh rKey ("bcda", rId, ACK mId3)
|
||||
|
||||
Resp "dabc" _ err5 <- sendRecv rh (sampleSig, "dabc", rId, DEL)
|
||||
(err5, ERR AUTH) #== "rejects DEL with wrong signature"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user