diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 9b6582a56..960c36390 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -375,8 +375,11 @@ subscribeSMPQueue :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT Pr subscribeSMPQueue c rpKey rId = sendSMPCommand c (Just rpKey) rId SUB >>= \case OK -> return () - cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd + cmd@MSG {} -> deliver cmd + cmd@LEN {} -> deliver cmd r -> throwE . PCEUnexpectedResponse $ bshow r + where + deliver = liftIO . writeSMPMessage c rId -- | Subscribe to multiple SMP queues batching commands if supported. subscribeSMPQueues :: SMPClient -> NonEmpty (RcvPrivateSignKey, RecipientId) -> IO (NonEmpty (Either ProtocolClientError ())) @@ -385,9 +388,12 @@ subscribeSMPQueues c qs = sendProtocolCommands c cs >>= mapM response . L.zip qs cs = L.map (\(rpKey, rId) -> (Just rpKey, rId, Cmd SRecipient SUB)) qs response ((_, rId), r) = case r of Right OK -> pure $ Right () - Right cmd@MSG {} -> writeSMPMessage c rId cmd $> Right () + Right cmd@MSG {} -> deliver cmd + Right cmd@LEN {} -> deliver cmd Right r' -> pure . Left . PCEUnexpectedResponse $ bshow r' Left e -> pure $ Left e + where + deliver cmd = writeSMPMessage c rId cmd $> Right () writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> IO () writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c rId msg) (msgQ c) @@ -403,8 +409,11 @@ getSMPMessage :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT Protoc getSMPMessage c rpKey rId = sendSMPCommand c (Just rpKey) rId GET >>= \case OK -> pure Nothing - cmd@(MSG msg) -> liftIO (writeSMPMessage c rId cmd) $> Just msg + cmd@(MSG msg) -> deliver cmd $> Just msg + cmd@LEN {} -> deliver cmd $> Nothing r -> throwE . PCEUnexpectedResponse $ bshow r + where + deliver = liftIO . writeSMPMessage c rId -- | Subscribe to the SMP queue notifications. -- @@ -469,8 +478,11 @@ ackSMPMessage :: SMPClient -> RcvPrivateSignKey -> QueueId -> MsgId -> ExceptT P ackSMPMessage c rpKey rId msgId = sendSMPCommand c (Just rpKey) rId (ACK msgId) >>= \case OK -> return () - cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd + cmd@MSG {} -> deliver cmd + cmd@LEN {} -> deliver cmd r -> throwE . PCEUnexpectedResponse $ bshow r + where + deliver = liftIO . writeSMPMessage c rId -- | Irreversibly suspend SMP queue. -- The existing messages from the queue will still be delivered. diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 1da8e1c07..4203467d7 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -590,7 +590,9 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv Just msg -> let encMsg = encryptMsg qr msg in atomically (setDelivered s msg) $> (corrId, rId, MSG encMsg) - _ -> forkSub $> ok + _ + | status qr == QueueActive -> forkSub $> ok + | otherwise -> pure (corrId, rId, LEN 0) _ -> pure ok where forkSub :: m () diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index bac0f5001..e5599965a 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -189,8 +189,8 @@ runAgentClientTest alice bob baseId = do get alice =##> \case ("", c, Msg "hello too") -> c == bobId; _ -> False ackMessage alice bobId $ baseId + 3 get alice =##> \case ("", c, Msg "message 1") -> c == bobId; _ -> False - ackMessage alice bobId $ baseId + 4 _ <- suspendConnection alice bobId + ackMessage alice bobId $ baseId + 4 5 <- msgId <$> sendMessage bob aliceId SMP.noMsgFlags "message 2" get bob ##> ("", aliceId, MERR (baseId + 5) (SMP AUTH)) deleteConnection alice bobId diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 27af23a0f..8b3ff6e09 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -265,7 +265,7 @@ testCreateDelete (ATransport t) = Resp "bcda" _ err2 <- signSendRecv rh rKey ("bcda", sId, OFF) (err2, ERR AUTH) #== "rejects OFF with sender's ID" - Resp "cdab" rId2 (LEN _) <- signSendRecv rh rKey ("cdab", rId, OFF) + Resp "cdab" rId2 (LEN 2) <- signSendRecv rh rKey ("cdab", rId, OFF) (rId2, rId) #== "same queue ID in response 2" Resp "dabc" _ err3 <- signSendRecv sh sKey ("dabc", sId, _SEND "hello") @@ -279,6 +279,11 @@ testCreateDelete (ATransport t) = Resp "cdab" _ (Msg mId2 msg2) <- signSendRecv rh rKey ("cdab", rId, SUB) (dec mId2 msg2, Right "hello") #== "accepts SUB when suspended and delivers the message again (because was not ACKed)" + Resp "abcd" _ (Msg mId3 msg3) <- signSendRecv rh rKey ("abcd", rId, ACK mId2) + (dec mId3 msg3, Right "hello 2") #== "deliver the next message on ACK" + + Resp "bcda" _ (LEN 0) <- signSendRecv rh rKey ("bcda", rId, ACK mId3) + Resp "dabc" _ err5 <- sendRecv rh (sampleSig, "dabc", rId, DEL) (err5, ERR AUTH) #== "rejects DEL with wrong signature"