agent: aggregate multiple expired subscription responses into a single UP event (#1160)

* agent: aggregate multiple expired subscription responses into a single UP event

* clean up

* refactor processing of expired responses

* refactor

* refactor 2

* refactor unexpectedResponse
This commit is contained in:
Evgeny Poberezkin
2024-05-20 07:56:51 +01:00
committed by GitHub
parent 7a15ea59c9
commit 8b21f7ef2a
8 changed files with 180 additions and 138 deletions

View File

@@ -14,6 +14,7 @@ module Simplex.FileTransfer.Client where
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
import Control.Monad.Trans.Except
import Crypto.Random (ChaChaDRG)
import Data.Bifunctor (first)
import Data.ByteString.Builder (Builder, byteString)
@@ -38,6 +39,7 @@ import Simplex.Messaging.Client
defaultNetworkConfig,
proxyUsername,
transportClientConfig,
unexpectedResponse,
)
import Simplex.Messaging.Client.Agent ()
import qualified Simplex.Messaging.Crypto as C
@@ -56,7 +58,7 @@ import Simplex.Messaging.Transport.Client (TransportClientConfig, TransportHost,
import Simplex.Messaging.Transport.HTTP2
import Simplex.Messaging.Transport.HTTP2.Client
import Simplex.Messaging.Transport.HTTP2.File
import Simplex.Messaging.Util (bshow, liftEitherWith, liftError', tshow, whenM)
import Simplex.Messaging.Util (liftEitherWith, liftError', tshow, whenM)
import Simplex.Messaging.Version
import UnliftIO
import UnliftIO.Directory
@@ -228,13 +230,13 @@ createXFTPChunk ::
createXFTPChunk c spKey file rcps auth_ =
sendXFTPCommand c spKey "" (FNEW file rcps auth_) Nothing >>= \case
(FRSndIds sId rIds, body) -> noFile body (sId, rIds)
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
(r, _) -> throwE $ unexpectedResponse r
addXFTPRecipients :: XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> NonEmpty C.APublicAuthKey -> ExceptT XFTPClientError IO (NonEmpty RecipientId)
addXFTPRecipients c spKey fId rcps =
sendXFTPCommand c spKey fId (FADD rcps) Nothing >>= \case
(FRRcvIds rIds, body) -> noFile body rIds
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
(r, _) -> throwE $ unexpectedResponse r
uploadXFTPChunk :: XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> XFTPChunkSpec -> ExceptT XFTPClientError IO ()
uploadXFTPChunk c spKey fId chunkSpec =
@@ -262,7 +264,7 @@ downloadXFTPChunk g c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec {
receiveEncFile chunkPart cbState chunkSpec `catchError` \e ->
whenM (doesFileExist filePath) (removeFile filePath) >> throwError e
_ -> throwError $ PCEResponseError NO_FILE
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
(r, _) -> throwE $ unexpectedResponse r
xftpReqTimeout :: XFTPClientConfig -> Maybe Word32 -> Int
xftpReqTimeout cfg@XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpTimeout}} chunkSize_ =
@@ -286,12 +288,12 @@ pingXFTP c@XFTPClient {thParams} = do
(r, _) <- sendXFTPTransmission c t Nothing
case r of
FRPong -> pure ()
_ -> throwError $ PCEUnexpectedResponse $ bshow r
_ -> throwE $ unexpectedResponse r
okResponse :: (FileResponse, HTTP2Body) -> ExceptT XFTPClientError IO ()
okResponse = \case
(FROk, body) -> noFile body ()
(r, _) -> throwError . PCEUnexpectedResponse $ bshow r
(r, _) -> throwE $ unexpectedResponse r
-- TODO this currently does not check anything because response size is not set and bodyPart is always Just
noFile :: HTTP2Body -> a -> ExceptT XFTPClientError IO a

View File

@@ -159,7 +159,7 @@ import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.SQLite
import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB
import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations
import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission, TransmissionType (..))
import Simplex.Messaging.Client (ProtocolClient (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch, temporaryClientError, unexpectedResponse)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs)
import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn)
@@ -170,7 +170,7 @@ import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfReg
import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..))
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Parsers (parse)
import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, VersionSMPC, XFTPServerWithAuth)
import Simplex.Messaging.Protocol (BrokerMsg, Cmd (..), EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SParty (..), SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, VersionSMPC, XFTPServerWithAuth)
import qualified Simplex.Messaging.Protocol as SMP
import Simplex.Messaging.ServiceScheme (ServiceScheme (..))
import qualified Simplex.Messaging.TMap as TM
@@ -1998,14 +1998,10 @@ getSMPServer c userId = withUserServers c userId pickServer
{-# INLINE getSMPServer #-}
subscriber :: AgentClient -> AM' ()
subscriber c@AgentClient {subQ, msgQ} = forever $ do
subscriber c@AgentClient {msgQ} = forever $ do
t <- atomically $ readTBQueue msgQ
agentOperationBracket c AORcvNetwork waitUntilActive $
tryAgentError' (processSMPTransmission c t) >>= \case
Left e -> do
logError $ tshow e
atomically $ writeTBQueue subQ ("", "", APC SAEConn $ ERR e)
Right _ -> return ()
processSMPTransmissions c t
cleanupManager :: AgentClient -> AM' ()
cleanupManager c@AgentClient {subQ} = do
@@ -2079,28 +2075,72 @@ cleanupManager c@AgentClient {subQ} = do
data ACKd = ACKd | ACKPending
-- | make sure to ACK or throw in each message processing branch
-- it cannot be finally, unfortunately, as sometimes it needs to be ACK+DEL
processSMPTransmission :: AgentClient -> ServerTransmission SMPVersion ErrorType BrokerMsg -> AM ()
processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, sessId, tType, rId, cmd) = do
(rq, SomeConn _ conn) <- withStore c (\db -> getRcvConn db srv rId)
processSMP rq conn $ toConnData conn
-- | Make sure to ACK or throw in each message processing branch
-- It cannot be finally, as sometimes it needs to be ACK+DEL,
-- and sometimes ACK has to be sent from the consumer.
processSMPTransmissions :: AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> AM' ()
processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts) = do
upConnIds <- newTVarIO []
forM_ ts $ \(entId, t) -> case t of
STEvent msgOrErr ->
withRcvConn entId $ \rq@RcvQueue {connId} conn -> case msgOrErr of
Right msg -> processSMP rq conn (toConnData conn) msg
Left e -> lift $ notifyErr connId e
STResponse (Cmd SRecipient cmd) respOrErr ->
withRcvConn entId $ \rq conn -> case cmd of
-- TODO process expired responses to ACK and DEL
SMP.SUB -> case respOrErr of
Right SMP.OK -> processSubOk rq upConnIds
Right msg@SMP.MSG {} -> do
processSubOk rq upConnIds
processSMP rq conn (toConnData conn) msg
Right r -> processSubErr rq $ unexpectedResponse r
Left e -> unless (temporaryClientError e) $ processSubErr rq e -- timeout/network was already reported
_ -> pure ()
STResponse {} -> pure () -- TODO process expired responses to sent messages
STUnexpectedError e -> do
logServer "<--" c srv entId $ "error: " <> bshow e
notifyErr "" e
connIds <- readTVarIO upConnIds
unless (null connIds) $ notify' "" $ UP srv connIds
where
processSMP :: forall c. RcvQueue -> Connection c -> ConnData -> AM ()
withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' ()
withRcvConn rId a = do
tryAgentError' (withStore c $ \db -> getRcvConn db srv rId) >>= \case
Left e -> notify' "" (ERR e)
Right (rq@RcvQueue {connId}, SomeConn _ conn) ->
tryAgentError' (a rq conn) >>= \case
Left e -> notify' connId (ERR e)
Right () -> pure ()
processSubOk :: RcvQueue -> TVar [ConnId] -> AM ()
processSubOk rq@RcvQueue {connId} upConnIds =
atomically . whenM (isPendingSub connId) $ do
addSubscription c rq
modifyTVar' upConnIds (connId :)
processSubErr :: RcvQueue -> SMPClientError -> AM ()
processSubErr rq@RcvQueue {connId} e = do
atomically . whenM (isPendingSub connId) $ failSubscription c rq e
lift $ notifyErr connId e
isPendingSub connId = (&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId
notify' :: forall e m. (AEntityI e, MonadIO m) => ConnId -> ACommand 'Agent e -> m ()
notify' connId msg = atomically $ writeTBQueue subQ ("", connId, APC (sAEntity @e) msg)
notifyErr :: ConnId -> SMPClientError -> AM' ()
notifyErr connId = notify' connId . ERR . protocolClientError SMP (B.unpack $ strEncode srv)
processSMP :: forall c. RcvQueue -> Connection c -> ConnData -> BrokerMsg -> AM ()
processSMP
rq@RcvQueue {e2ePrivKey, e2eDhSecret, status}
rq@RcvQueue {rcvId = rId, e2ePrivKey, e2eDhSecret, status}
conn
cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss} =
withConnLock c connId "processSMP" $ case cmd of
Right r@(SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId}) ->
cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss}
smpMsg =
withConnLock c connId "processSMP" $ case smpMsg of
SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} ->
void . handleNotifyAck $ do
isGET <- atomically $ hasGetLock c rq
unless isGET $ checkExpiredResponse r
msg' <- decryptSMPMessage rq msg
ack' <- handleNotifyAck $ case msg' of
SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} -> processClientMsg srvTs msgFlags msgBody
SMP.ClientRcvMsgQuota {} -> queueDrained >> ack
when isGET $ notify (MSGNTF $ SMP.rcvMessageMeta srvMsgId msg')
whenM (atomically $ hasGetLock c rq) $
notify (MSGNTF $ SMP.rcvMessageMeta srvMsgId msg')
pure ack'
where
queueDrained = case conn of
@@ -2259,29 +2299,23 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
ackDel aId = enqueueCmd (ICAckDel rId srvMsgId aId) $> ACKd
handleNotifyAck :: AM ACKd -> AM ACKd
handleNotifyAck m = m `catchAgentError` \e -> notify (ERR e) >> ack
Right SMP.END ->
atomically (TM.lookup tSess smpClients $>>= (tryReadTMVar . sessionVar) >>= processEND)
>>= logServer "<--" c srv rId
SMP.END ->
atomically (TM.lookup tSess (smpClients c) $>>= (tryReadTMVar . sessionVar) >>= processEND)
>>= notifyEnd
where
processEND = \case
Just (Right clnt)
| sessId == sessionId (thParams $ connectedClient clnt) -> do
removeSubscription c connId
notify' END
pure "END"
| otherwise -> ignored
_ -> ignored
ignored = pure "END from disconnected client - ignored"
Right (SMP.ERR e) -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e
Right r@SMP.OK -> checkExpiredResponse r
Right r -> unexpected r
Left e -> notify $ ERR $ protocolClientError SMP (B.unpack $ strEncode srv) e
| sessId == sessionId (thParams $ connectedClient clnt) ->
removeSubscription c connId $> True
_ -> pure False
notifyEnd removed
| removed = notify END >> logServer "<--" c srv rId "END"
| otherwise = logServer "<--" c srv rId "END from disconnected client - ignored"
SMP.ERR e -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e
r -> unexpected r
where
notify :: forall e m. MonadIO m => AEntityI e => ACommand 'Agent e -> m ()
notify = atomically . notify'
notify' :: forall e. AEntityI e => ACommand 'Agent e -> STM ()
notify' msg = writeTBQueue subQ ("", connId, APC (sAEntity @e) msg)
notify :: forall e m. (AEntityI e, MonadIO m) => ACommand 'Agent e -> m ()
notify = notify' connId
prohibited :: AM ()
prohibited = notify . ERR $ AGENT A_PROHIBITED
@@ -2291,25 +2325,10 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v,
unexpected :: BrokerMsg -> AM ()
unexpected r = do
logServer "<--" c srv rId $ "unexpected: " <> bshow cmd
logServer "<--" c srv rId $ "unexpected: " <> bshow r
-- TODO add extended information about transmission type once UNEXPECTED has string
notify . ERR $ BROKER (B.unpack $ strEncode srv) $ UNEXPECTED (take 32 $ show r)
checkExpiredResponse :: BrokerMsg -> AM ()
checkExpiredResponse r = case tType of
TTEvent -> pure ()
TTUncorrelatedResponse -> unexpected r
TTExpiredResponse (SMP.Cmd _ cmd') -> case cmd' of
SMP.SUB -> do
added <-
atomically $
ifM
((&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId)
(True <$ addSubscription c rq)
(pure False)
when added $ notify $ UP srv [connId]
_ -> pure ()
decryptClientMessage :: C.DhSecretX25519 -> SMP.ClientMsgEnvelope -> AM (SMP.PrivHeader, AgentMsgEnvelope)
decryptClientMessage e2eDh SMP.ClientMsgEnvelope {cmNonce, cmEncBody} = do
clientMsg <- agentCbDecrypt e2eDh cmNonce cmEncBody

View File

@@ -41,6 +41,7 @@ module Simplex.Messaging.Agent.Client
getQueueMessage,
decryptSMPMessage,
addSubscription,
failSubscription,
addNewQueueSubscription,
getSubscriptions,
sendConfirmation,
@@ -268,7 +269,7 @@ data AgentClient = AgentClient
active :: TVar Bool,
rcvQ :: TBQueue (ATransmission 'Client),
subQ :: TBQueue (ATransmission 'Agent),
msgQ :: TBQueue (ServerTransmission SMPVersion ErrorType BrokerMsg),
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg),
smpServers :: TMap UserId (NonEmpty SMPServerWithAuth),
smpClients :: TMap SMPTransportSession SMPClientVar,
-- smpProxiedRelays:
@@ -1079,7 +1080,7 @@ protocolClientError :: (Show err, Encoding err) => (HostName -> err -> AgentErro
protocolClientError protocolError_ host = \case
PCEProtocolError e -> protocolError_ host e
PCEResponseError e -> BROKER host $ RESPONSE $ B.unpack $ smpEncode e
PCEUnexpectedResponse r -> BROKER host $ UNEXPECTED $ take 32 $ show r
PCEUnexpectedResponse e -> BROKER host $ UNEXPECTED $ B.unpack e
PCEResponseTimeout -> BROKER host TIMEOUT
PCENetworkError -> BROKER host NETWORK
PCEIncompatibleHost -> BROKER host HOST
@@ -1269,9 +1270,8 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode = do
processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM ()
processSubResult c rq@RcvQueue {connId} = \case
Left e ->
unless (temporaryClientError e) $ do
RQ.deleteQueue rq (pendingSubs c)
TM.insert (RQ.qKey rq) e (removedSubs c)
unless (temporaryClientError e) $
failSubscription c rq e
Right () ->
whenM (hasPendingSubscription c connId) $
addSubscription c rq
@@ -1391,6 +1391,11 @@ addSubscription c rq@RcvQueue {connId} = do
RQ.addQueue rq $ activeSubs c
RQ.deleteQueue rq $ pendingSubs c
failSubscription :: AgentClient -> RcvQueue -> SMPClientError -> STM ()
failSubscription c rq e = do
RQ.deleteQueue rq (pendingSubs c)
TM.insert (RQ.qKey rq) e (removedSubs c)
addPendingSubscription :: AgentClient -> RcvQueue -> STM ()
addPendingSubscription c rq@RcvQueue {connId} = do
modifyTVar' (subscrConns c) $ S.insert connId

View File

@@ -65,6 +65,7 @@ module Simplex.Messaging.Client
ProtocolClientError (..),
SMPClientError,
ProxyClientError (..),
unexpectedResponse,
ProtocolClientConfig (..),
NetworkConfig (..),
TransportSessionMode (..),
@@ -80,8 +81,8 @@ module Simplex.Messaging.Client
proxyUsername,
temporaryClientError,
smpProxyError,
ServerTransmission,
TransmissionType (..),
ServerTransmissionBatch,
ServerTransmission (..),
ClientCommand,
-- * For testing
@@ -155,7 +156,7 @@ data PClient v err msg = PClient
sentCommands :: TMap CorrId (Request err msg),
sndQ :: TBQueue ByteString,
rcvQ :: TBQueue (NonEmpty (SignedTransmission err msg)),
msgQ :: Maybe (TBQueue (ServerTransmission v err msg))
msgQ :: Maybe (TBQueue (ServerTransmissionBatch v err msg))
}
smpClientStub :: TVar ChaChaDRG -> ByteString -> VersionSMP -> Maybe (THandleAuth 'TClient) -> STM SMPClient
@@ -206,10 +207,14 @@ type SMPClient = ProtocolClient SMPVersion ErrorType BrokerMsg
-- | Type for client command data
type ClientCommand msg = (Maybe C.APrivateAuthKey, EntityId, ProtoCommand msg)
-- | Type synonym for transmission from some SPM server queue.
type ServerTransmission v err msg = (TransportSession msg, Version v, SessionId, TransmissionType msg, EntityId, Either (ProtocolClientError err) msg)
-- | Type synonym for transmission from SPM servers.
-- Batch response is presented as a single `ServerTransmissionBatch` tuple.
type ServerTransmissionBatch v err msg = (TransportSession msg, Version v, SessionId, NonEmpty (EntityId, ServerTransmission err msg))
data TransmissionType msg = TTEvent | TTUncorrelatedResponse | TTExpiredResponse (ProtoCommand msg)
data ServerTransmission err msg
= STEvent (Either (ProtocolClientError err) msg)
| STResponse (ProtoCommand msg) (Either (ProtocolClientError err) msg)
| STUnexpectedError (ProtocolClientError err)
data HostMode
= -- | prefer (or require) onion hosts when connecting via SOCKS proxy
@@ -396,7 +401,7 @@ type TransportSession msg = (UserId, ProtoServer msg, Maybe EntityId)
--
-- A single queue can be used for multiple 'SMPClient' instances,
-- as 'SMPServerTransmission' includes server information.
getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> TransportSession msg -> ProtocolClientConfig v -> Maybe (TBQueue (ServerTransmission v err msg)) -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg))
getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> TransportSession msg -> ProtocolClientConfig v -> Maybe (TBQueue (ServerTransmissionBatch v err msg)) -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg))
getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serverVRange, agreeSecret} msgQ disconnected = do
case chooseTransportHost networkConfig (host srv) of
Right useHost ->
@@ -498,38 +503,47 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
maxCnt = smpPingCount networkConfig
process :: ProtocolClient v err msg -> IO ()
process c = forever $ atomically (readTBQueue $ rcvQ $ client_ c) >>= mapM_ (processMsg c)
process c = forever $ atomically (readTBQueue $ rcvQ $ client_ c) >>= processMsgs c
processMsg :: ProtocolClient v err msg -> SignedTransmission err msg -> IO ()
processMsg c@ProtocolClient {client_ = PClient {sentCommands}} (_, _, (corrId, entId, respOrErr))
| not $ B.null $ bs corrId =
processMsgs :: ProtocolClient v err msg -> NonEmpty (SignedTransmission err msg) -> IO ()
processMsgs c ts = do
tsVar <- newTVarIO []
mapM_ (processMsg c tsVar) ts
ts' <- readTVarIO tsVar
forM_ msgQ $ \q ->
mapM_ (atomically . writeTBQueue q . serverTransmission c) (L.nonEmpty ts')
processMsg :: ProtocolClient v err msg -> TVar [(EntityId, ServerTransmission err msg)] -> SignedTransmission err msg -> IO ()
processMsg ProtocolClient {client_ = PClient {sentCommands}} tsVar (_, _, (corrId, entId, respOrErr))
| B.null $ bs corrId = sendMsg $ STEvent clientResp
| otherwise =
atomically (TM.lookup corrId sentCommands) >>= \case
Nothing -> sendMsg TTUncorrelatedResponse
Nothing -> sendMsg $ STUnexpectedError unexpected
Just Request {entityId, command, pending, responseVar} -> do
wasPending <-
atomically $ do
TM.delete corrId sentCommands
ifM
(swapTVar pending False)
(True <$ tryPutTMVar responseVar (response entityId))
(True <$ tryPutTMVar responseVar (if entityId == entId then clientResp else Left unexpected))
(pure False)
unless wasPending $ sendMsg $ if entityId == entId then TTExpiredResponse command else TTUncorrelatedResponse
| otherwise = sendMsg TTEvent
unless wasPending $ sendMsg $ if entityId == entId then STResponse command clientResp else STUnexpectedError unexpected
where
response entityId
| entityId == entId = clientResp
| otherwise = Left . PCEUnexpectedResponse $ bshow respOrErr
unexpected = unexpectedResponse respOrErr
clientResp = case respOrErr of
Left e -> Left $ PCEResponseError e
Right r -> case protocolError r of
Just e -> Left $ PCEProtocolError e
_ -> Right r
sendMsg :: TransmissionType msg -> IO ()
sendMsg tType = case msgQ of
Just q -> atomically $ writeTBQueue q $ serverTransmission c tType entId clientResp
sendMsg :: ServerTransmission err msg -> IO ()
sendMsg t = case msgQ of
Just _ -> atomically $ modifyTVar' tsVar ((entId, t) :)
Nothing -> case clientResp of
Left e -> logError $ "SMP client error: " <> tshow e
Right _ -> logWarn $ "SMP client unprocessed event"
Left e -> logError ("SMP client error: " <> tshow e)
Right _ -> logWarn ("SMP client unprocessed event")
unexpectedResponse :: Show r => r -> ProtocolClientError err
unexpectedResponse = PCEUnexpectedResponse . B.pack . take 32 . show
proxyUsername :: TransportSession msg -> ByteString
proxyUsername (userId, _, entityId_) = C.sha256Hash $ bshow userId <> maybe "" (":" <>) entityId_
@@ -585,7 +599,7 @@ smpProxyError :: SMPClientError -> ErrorType
smpProxyError = \case
PCEProtocolError e -> PROXY $ PROTOCOL e
PCEResponseError e -> PROXY $ BROKER $ RESPONSE $ B.unpack $ strEncode e
PCEUnexpectedResponse s -> PROXY $ BROKER $ UNEXPECTED $ B.unpack $ B.take 32 s
PCEUnexpectedResponse e -> PROXY $ BROKER $ UNEXPECTED $ B.unpack e
PCEResponseTimeout -> PROXY $ BROKER TIMEOUT
PCENetworkError -> PROXY $ BROKER NETWORK
PCEIncompatibleHost -> PROXY $ BROKER HOST
@@ -606,7 +620,7 @@ createSMPQueue ::
createSMPQueue c (rKey, rpKey) dhKey auth subMode =
sendSMPCommand c (Just rpKey) "" (NEW rKey dhKey auth subMode) >>= \case
IDS qik -> pure qik
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Subscribe to the SMP queue.
--
@@ -617,7 +631,7 @@ subscribeSMPQueue c@ProtocolClient {client_ = PClient {sendPings}} rpKey rId = d
sendSMPCommand c (Just rpKey) rId SUB >>= \case
OK -> pure ()
cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Subscribe to multiple SMP queues batching commands if supported.
subscribeSMPQueues :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId) -> IO (NonEmpty (Either SMPClientError ()))
@@ -637,15 +651,15 @@ processSUBResponse :: SMPClient -> Response ErrorType BrokerMsg -> IO (Either SM
processSUBResponse c (Response rId r) = case r of
Right OK -> pure $ Right ()
Right cmd@MSG {} -> writeSMPMessage c rId cmd $> Right ()
Right r' -> pure . Left . PCEUnexpectedResponse $ bshow r'
Right r' -> pure . Left $ unexpectedResponse r'
Left e -> pure $ Left e
writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> IO ()
writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c TTEvent rId (Right msg)) (msgQ $ client_ c)
writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c [(rId, STEvent (Right msg))]) (msgQ $ client_ c)
serverTransmission :: ProtocolClient v err msg -> TransmissionType msg -> RecipientId -> Either (ProtocolClientError err) msg -> ServerTransmission v err msg
serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionId}, client_ = PClient {transportSession}} tType entityId msgOrErr =
(transportSession, thVersion, sessionId, tType, entityId, msgOrErr)
serverTransmission :: ProtocolClient v err msg -> NonEmpty (RecipientId, ServerTransmission err msg) -> ServerTransmissionBatch v err msg
serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionId}, client_ = PClient {transportSession}} ts =
(transportSession, thVersion, sessionId, ts)
-- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue
--
@@ -655,7 +669,7 @@ getSMPMessage c rpKey rId =
sendSMPCommand c (Just rpKey) rId GET >>= \case
OK -> pure Nothing
cmd@(MSG msg) -> liftIO (writeSMPMessage c rId cmd) $> Just msg
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Subscribe to the SMP queue notifications.
--
@@ -683,7 +697,7 @@ enableSMPQueueNotifications :: SMPClient -> RcvPrivateAuthKey -> RecipientId ->
enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey =
sendSMPCommand c (Just rpKey) rId (NKEY notifierKey rcvNtfPublicDhKey) >>= \case
NID nId rcvNtfSrvPublicDhKey -> pure (nId, rcvNtfSrvPublicDhKey)
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Enable notifications for the multiple queues for push notifications server.
enableSMPQueuesNtfs :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId, NtfPublicAuthKey, RcvNtfPublicDhKey) -> IO (NonEmpty (Either SMPClientError (NotifierId, RcvNtfPublicDhKey)))
@@ -692,7 +706,7 @@ enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands c cs
cs = L.map (\(rpKey, rId, notifierKey, rcvNtfPublicDhKey) -> (Just rpKey, rId, Cmd SRecipient $ NKEY notifierKey rcvNtfPublicDhKey)) qs
process (Response _ r) = case r of
Right (NID nId rcvNtfSrvPublicDhKey) -> Right (nId, rcvNtfSrvPublicDhKey)
Right r' -> Left . PCEUnexpectedResponse $ bshow r'
Right r' -> Left $ unexpectedResponse r'
Left e -> Left e
-- | Disable notifications for the queue for push notifications server.
@@ -714,7 +728,7 @@ sendSMPMessage :: SMPClient -> Maybe SndPrivateAuthKey -> SenderId -> MsgFlags -
sendSMPMessage c spKey sId flags msg =
sendSMPCommand c spKey sId (SEND flags msg) >>= \case
OK -> pure ()
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Acknowledge message delivery (server deletes the message).
--
@@ -724,7 +738,7 @@ ackSMPMessage c rpKey rId msgId =
sendSMPCommand c (Just rpKey) rId (ACK msgId) >>= \case
OK -> return ()
cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
-- | Irreversibly suspend SMP queue.
-- The existing messages from the queue will still be delivered.
@@ -756,7 +770,7 @@ connectSMPProxiedRelay c@ProtocolClient {client_ = PClient {tcpConnectTimeout, t
case supportedClientSMPRelayVRange `compatibleVersion` vr of
Nothing -> throwE $ transportErr TEVersion
Just (Compatible v) -> liftEitherWith (const $ transportErr $ TEHandshake IDENTITY) $ ProxiedRelay sId v <$> validateRelay chain key
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
| otherwise = throwE $ PCETransportError TEVersion
where
tOut = Just $ tcpConnectTimeout + tcpTimeout
@@ -862,14 +876,14 @@ proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {c
(_auth, _signed, (_c, _e, cmd)) -> case cmd of
Right OK -> pure $ Right ()
Right (ERR e) -> throwE $ PCEProtocolError e -- this is the error from the destination relay
Right e -> throwE $ PCEUnexpectedResponse $ B.take 32 $ bshow e
Right r' -> throwE $ unexpectedResponse r'
Left e -> throwE $ PCEResponseError e
_ -> throwE $ PCETransportError TEBadBlock
ERR e -> pure . Left $ ProxyProtocolError e -- this will not happen, this error is returned via Left
_ -> pure . Left $ ProxyUnexpectedResponse $ take 32 $ show r
Left e -> case e of
PCEProtocolError e' -> pure . Left $ ProxyProtocolError e'
PCEUnexpectedResponse r -> pure . Left $ ProxyUnexpectedResponse $ B.unpack r
PCEUnexpectedResponse e' -> pure . Left $ ProxyUnexpectedResponse $ B.unpack e'
PCEResponseError e' -> pure . Left $ ProxyResponseError e'
_ -> throwE e
@@ -894,13 +908,13 @@ forwardSMPMessage c@ProtocolClient {thParams, client_ = PClient {clientCorrId =
r' <- liftEitherWith PCECryptoError $ C.cbDecryptNoPad sessSecret (C.reverseNonce nonce) efr
FwdResponse {fwdCorrId = _, fwdResponse} <- liftEitherWith (const $ PCEResponseError BLOCK) $ smpDecode r'
pure fwdResponse
r -> throwE . PCEUnexpectedResponse $ B.take 32 $ bshow r
r -> throwE $ unexpectedResponse r
okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO ()
okSMPCommand cmd c pKey qId =
sendSMPCommand c (Just pKey) qId cmd >>= \case
OK -> return ()
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
okSMPCommands :: PartyI p => Command p -> SMPClient -> NonEmpty (C.APrivateAuthKey, QueueId) -> IO (NonEmpty (Either SMPClientError ()))
okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs
@@ -909,7 +923,7 @@ okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs
cs = L.map (\(pKey, qId) -> (Just pKey, qId, aCmd)) qs
process (Response _ r) = case r of
Right OK -> Right ()
Right r' -> Left . PCEUnexpectedResponse $ bshow r'
Right r' -> Left $ unexpectedResponse r'
Left e -> Left e
-- | Send SMP command

View File

@@ -99,7 +99,7 @@ defaultSMPClientAgentConfig =
data SMPClientAgent = SMPClientAgent
{ agentCfg :: SMPClientAgentConfig,
active :: TVar Bool,
msgQ :: TBQueue (ServerTransmission SMPVersion ErrorType BrokerMsg),
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg),
agentQ :: TBQueue SMPClientAgentEvent,
randomDrg :: TVar ChaChaDRG,
smpClients :: TMap SMPServer SMPClientVar,

View File

@@ -12,7 +12,6 @@ import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Notifications.Protocol
import Simplex.Messaging.Notifications.Transport (NTFVersion, supportedClientNTFVRange, supportedNTFHandshakes)
import Simplex.Messaging.Protocol (ErrorType)
import Simplex.Messaging.Util (bshow)
type NtfClient = ProtocolClient NTFVersion ErrorType NtfResponse
@@ -25,7 +24,7 @@ ntfRegisterToken :: NtfClient -> C.APrivateAuthKey -> NewNtfEntity 'Token -> Exc
ntfRegisterToken c pKey newTkn =
sendNtfCommand c (Just pKey) "" (TNEW newTkn) >>= \case
NRTknId tknId dhKey -> pure (tknId, dhKey)
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
ntfVerifyToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> NtfRegCode -> ExceptT NtfClientError IO ()
ntfVerifyToken c pKey tknId code = okNtfCommand (TVFY code) c pKey tknId
@@ -34,7 +33,7 @@ ntfCheckToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> ExceptT NtfClie
ntfCheckToken c pKey tknId =
sendNtfCommand c (Just pKey) tknId TCHK >>= \case
NRTkn stat -> pure stat
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
ntfReplaceToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> DeviceToken -> ExceptT NtfClientError IO ()
ntfReplaceToken c pKey tknId token = okNtfCommand (TRPL token) c pKey tknId
@@ -49,13 +48,13 @@ ntfCreateSubscription :: NtfClient -> C.APrivateAuthKey -> NewNtfEntity 'Subscri
ntfCreateSubscription c pKey newSub =
sendNtfCommand c (Just pKey) "" (SNEW newSub) >>= \case
NRSubId subId -> pure subId
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
ntfCheckSubscription :: NtfClient -> C.APrivateAuthKey -> NtfSubscriptionId -> ExceptT NtfClientError IO NtfSubStatus
ntfCheckSubscription c pKey subId =
sendNtfCommand c (Just pKey) subId SCHK >>= \case
NRSub stat -> pure stat
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r
ntfDeleteSubscription :: NtfClient -> C.APrivateAuthKey -> NtfSubscriptionId -> ExceptT NtfClientError IO ()
ntfDeleteSubscription = okNtfCommand SDEL
@@ -68,4 +67,4 @@ okNtfCommand :: NtfEntityI e => NtfCommand e -> NtfClient -> C.APrivateAuthKey -
okNtfCommand cmd c pKey entId =
sendNtfCommand c (Just pKey) entId cmd >>= \case
NROk -> return ()
r -> throwE . PCEUnexpectedResponse $ bshow r
r -> throwE $ unexpectedResponse r

View File

@@ -31,7 +31,7 @@ import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Time.Format.ISO8601 (iso8601Show)
import Network.Socket (ServiceName)
import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError)
import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmission (..))
import Simplex.Messaging.Client.Agent
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Encoding.String
@@ -220,23 +220,27 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
receiveSMP :: M ()
receiveSMP = forever $ do
((_, srv, _), _, _, _tType, ntfId, msgOrErr) <- atomically $ readTBQueue msgQ
let smpQueue = SMPQueueNtf srv ntfId
case msgOrErr of
Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do
ntfTs <- liftIO getSystemTime
st <- asks store
NtfPushServer {pushQ} <- asks pushServer
stats <- asks serverStats
atomically $ updatePeriodStats (activeSubs stats) ntfId
atomically $
findNtfSubscriptionToken st smpQueue
>>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}))
incNtfStat ntfReceived
Right SMP.END -> updateSubStatus smpQueue NSEnd
Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e
Right _ -> logError $ "SMP server unexpected response"
Left e -> logError $ "SMP client error: " <> tshow e
((_, srv, _), _, _, ts) <- atomically $ readTBQueue msgQ
forM ts $ \(ntfId, t) -> case t of
STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen
STResponse {} -> pure () -- it was already reported as timeout error
STEvent msgOrErr -> do
let smpQueue = SMPQueueNtf srv ntfId
case msgOrErr of
Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do
ntfTs <- liftIO getSystemTime
st <- asks store
NtfPushServer {pushQ} <- asks pushServer
stats <- asks serverStats
atomically $ updatePeriodStats (activeSubs stats) ntfId
atomically $
findNtfSubscriptionToken st smpQueue
>>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta}))
incNtfStat ntfReceived
Right SMP.END -> updateSubStatus smpQueue NSEnd
Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e
Right _ -> logError $ "SMP server unexpected response"
Left e -> logError $ "SMP client error: " <> tshow e
receiveAgent =
forever $
@@ -248,7 +252,6 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
forM_ subs $ \(_, ntfId) -> do
let smpQueue = SMPQueueNtf srv ntfId
updateSubStatus smpQueue NSInactive
CAResubscribed srv subs -> do
forM_ subs $ \(_, ntfId) -> updateSubStatus (SMPQueueNtf srv ntfId) NSActive
logSubStatus srv "resubscribed" $ length subs

View File

@@ -120,7 +120,7 @@ deliverMessageViaProxy proxyServ relayServ alg msg msg' = do
-- send via proxy to unsecured queue
Right () <- proxySMPMessage pc sess Nothing sndId noMsgFlags msg
-- receive 1
(_tSess, _v, _sid, _isResp, _entId, Right (SMP.MSG RcvMessage {msgId, msgBody = EncRcvMsgBody encBody})) <- atomically $ readTBQueue msgQ
(_tSess, _v, _sid, [(_entId, STEvent (Right (SMP.MSG RcvMessage {msgId, msgBody = EncRcvMsgBody encBody})))]) <- atomically $ readTBQueue msgQ
liftIO $ dec msgId encBody `shouldBe` Right msg
ackSMPMessage rc rPriv rcvId msgId
-- secure queue
@@ -129,7 +129,7 @@ deliverMessageViaProxy proxyServ relayServ alg msg msg' = do
-- send via proxy to secured queue
Right () <- proxySMPMessage pc sess (Just sPriv) sndId noMsgFlags msg'
-- receive 2
(_tSess, _v, _sid, _isResp, _entId, Right (SMP.MSG RcvMessage {msgId = msgId', msgBody = EncRcvMsgBody encBody'})) <- atomically $ readTBQueue msgQ
(_tSess, _v, _sid, [(_entId, STEvent (Right (SMP.MSG RcvMessage {msgId = msgId', msgBody = EncRcvMsgBody encBody'})))]) <- atomically $ readTBQueue msgQ
liftIO $ dec msgId' encBody' `shouldBe` Right msg'
ackSMPMessage rc rPriv rcvId msgId'