diff --git a/src/Simplex/FileTransfer/Client.hs b/src/Simplex/FileTransfer/Client.hs index 7875542a6..4fb18c27a 100644 --- a/src/Simplex/FileTransfer/Client.hs +++ b/src/Simplex/FileTransfer/Client.hs @@ -14,6 +14,7 @@ module Simplex.FileTransfer.Client where import Control.Logger.Simple import Control.Monad import Control.Monad.Except +import Control.Monad.Trans.Except import Crypto.Random (ChaChaDRG) import Data.Bifunctor (first) import Data.ByteString.Builder (Builder, byteString) @@ -38,6 +39,7 @@ import Simplex.Messaging.Client defaultNetworkConfig, proxyUsername, transportClientConfig, + unexpectedResponse, ) import Simplex.Messaging.Client.Agent () import qualified Simplex.Messaging.Crypto as C @@ -56,7 +58,7 @@ import Simplex.Messaging.Transport.Client (TransportClientConfig, TransportHost, import Simplex.Messaging.Transport.HTTP2 import Simplex.Messaging.Transport.HTTP2.Client import Simplex.Messaging.Transport.HTTP2.File -import Simplex.Messaging.Util (bshow, liftEitherWith, liftError', tshow, whenM) +import Simplex.Messaging.Util (liftEitherWith, liftError', tshow, whenM) import Simplex.Messaging.Version import UnliftIO import UnliftIO.Directory @@ -228,13 +230,13 @@ createXFTPChunk :: createXFTPChunk c spKey file rcps auth_ = sendXFTPCommand c spKey "" (FNEW file rcps auth_) Nothing >>= \case (FRSndIds sId rIds, body) -> noFile body (sId, rIds) - (r, _) -> throwError . PCEUnexpectedResponse $ bshow r + (r, _) -> throwE $ unexpectedResponse r addXFTPRecipients :: XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> NonEmpty C.APublicAuthKey -> ExceptT XFTPClientError IO (NonEmpty RecipientId) addXFTPRecipients c spKey fId rcps = sendXFTPCommand c spKey fId (FADD rcps) Nothing >>= \case (FRRcvIds rIds, body) -> noFile body rIds - (r, _) -> throwError . PCEUnexpectedResponse $ bshow r + (r, _) -> throwE $ unexpectedResponse r uploadXFTPChunk :: XFTPClient -> C.APrivateAuthKey -> XFTPFileId -> XFTPChunkSpec -> ExceptT XFTPClientError IO () uploadXFTPChunk c spKey fId chunkSpec = @@ -262,7 +264,7 @@ downloadXFTPChunk g c@XFTPClient {config} rpKey fId chunkSpec@XFTPRcvChunkSpec { receiveEncFile chunkPart cbState chunkSpec `catchError` \e -> whenM (doesFileExist filePath) (removeFile filePath) >> throwError e _ -> throwError $ PCEResponseError NO_FILE - (r, _) -> throwError . PCEUnexpectedResponse $ bshow r + (r, _) -> throwE $ unexpectedResponse r xftpReqTimeout :: XFTPClientConfig -> Maybe Word32 -> Int xftpReqTimeout cfg@XFTPClientConfig {xftpNetworkConfig = NetworkConfig {tcpTimeout}} chunkSize_ = @@ -286,12 +288,12 @@ pingXFTP c@XFTPClient {thParams} = do (r, _) <- sendXFTPTransmission c t Nothing case r of FRPong -> pure () - _ -> throwError $ PCEUnexpectedResponse $ bshow r + _ -> throwE $ unexpectedResponse r okResponse :: (FileResponse, HTTP2Body) -> ExceptT XFTPClientError IO () okResponse = \case (FROk, body) -> noFile body () - (r, _) -> throwError . PCEUnexpectedResponse $ bshow r + (r, _) -> throwE $ unexpectedResponse r -- TODO this currently does not check anything because response size is not set and bodyPart is always Just noFile :: HTTP2Body -> a -> ExceptT XFTPClientError IO a diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 7bcec443b..3096291de 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -159,7 +159,7 @@ import Simplex.Messaging.Agent.Store import Simplex.Messaging.Agent.Store.SQLite import qualified Simplex.Messaging.Agent.Store.SQLite.DB as DB import qualified Simplex.Messaging.Agent.Store.SQLite.Migrations as Migrations -import Simplex.Messaging.Client (ProtocolClient (..), ServerTransmission, TransmissionType (..)) +import Simplex.Messaging.Client (ProtocolClient (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch, temporaryClientError, unexpectedResponse) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Crypto.File (CryptoFile, CryptoFileArgs) import Simplex.Messaging.Crypto.Ratchet (PQEncryption, PQSupport (..), pattern PQEncOff, pattern PQEncOn, pattern PQSupportOff, pattern PQSupportOn) @@ -170,7 +170,7 @@ import Simplex.Messaging.Notifications.Protocol (DeviceToken, NtfRegCode (NtfReg import Simplex.Messaging.Notifications.Server.Push.APNS (PNMessageData (..)) import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Parsers (parse) -import Simplex.Messaging.Protocol (BrokerMsg, EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, VersionSMPC, XFTPServerWithAuth) +import Simplex.Messaging.Protocol (BrokerMsg, Cmd (..), EntityId, ErrorType (AUTH), MsgBody, MsgFlags (..), NtfServer, ProtoServerWithAuth, ProtocolTypeI (..), SMPMsgMeta, SParty (..), SProtocolType (..), SndPublicAuthKey, SubscriptionMode (..), UserProtocol, VersionSMPC, XFTPServerWithAuth) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.ServiceScheme (ServiceScheme (..)) import qualified Simplex.Messaging.TMap as TM @@ -1998,14 +1998,10 @@ getSMPServer c userId = withUserServers c userId pickServer {-# INLINE getSMPServer #-} subscriber :: AgentClient -> AM' () -subscriber c@AgentClient {subQ, msgQ} = forever $ do +subscriber c@AgentClient {msgQ} = forever $ do t <- atomically $ readTBQueue msgQ agentOperationBracket c AORcvNetwork waitUntilActive $ - tryAgentError' (processSMPTransmission c t) >>= \case - Left e -> do - logError $ tshow e - atomically $ writeTBQueue subQ ("", "", APC SAEConn $ ERR e) - Right _ -> return () + processSMPTransmissions c t cleanupManager :: AgentClient -> AM' () cleanupManager c@AgentClient {subQ} = do @@ -2079,28 +2075,72 @@ cleanupManager c@AgentClient {subQ} = do data ACKd = ACKd | ACKPending --- | make sure to ACK or throw in each message processing branch --- it cannot be finally, unfortunately, as sometimes it needs to be ACK+DEL -processSMPTransmission :: AgentClient -> ServerTransmission SMPVersion ErrorType BrokerMsg -> AM () -processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, sessId, tType, rId, cmd) = do - (rq, SomeConn _ conn) <- withStore c (\db -> getRcvConn db srv rId) - processSMP rq conn $ toConnData conn +-- | Make sure to ACK or throw in each message processing branch +-- It cannot be finally, as sometimes it needs to be ACK+DEL, +-- and sometimes ACK has to be sent from the consumer. +processSMPTransmissions :: AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> AM' () +processSMPTransmissions c@AgentClient {subQ} (tSess@(_, srv, _), _v, sessId, ts) = do + upConnIds <- newTVarIO [] + forM_ ts $ \(entId, t) -> case t of + STEvent msgOrErr -> + withRcvConn entId $ \rq@RcvQueue {connId} conn -> case msgOrErr of + Right msg -> processSMP rq conn (toConnData conn) msg + Left e -> lift $ notifyErr connId e + STResponse (Cmd SRecipient cmd) respOrErr -> + withRcvConn entId $ \rq conn -> case cmd of + -- TODO process expired responses to ACK and DEL + SMP.SUB -> case respOrErr of + Right SMP.OK -> processSubOk rq upConnIds + Right msg@SMP.MSG {} -> do + processSubOk rq upConnIds + processSMP rq conn (toConnData conn) msg + Right r -> processSubErr rq $ unexpectedResponse r + Left e -> unless (temporaryClientError e) $ processSubErr rq e -- timeout/network was already reported + _ -> pure () + STResponse {} -> pure () -- TODO process expired responses to sent messages + STUnexpectedError e -> do + logServer "<--" c srv entId $ "error: " <> bshow e + notifyErr "" e + connIds <- readTVarIO upConnIds + unless (null connIds) $ notify' "" $ UP srv connIds where - processSMP :: forall c. RcvQueue -> Connection c -> ConnData -> AM () + withRcvConn :: SMP.RecipientId -> (forall c. RcvQueue -> Connection c -> AM ()) -> AM' () + withRcvConn rId a = do + tryAgentError' (withStore c $ \db -> getRcvConn db srv rId) >>= \case + Left e -> notify' "" (ERR e) + Right (rq@RcvQueue {connId}, SomeConn _ conn) -> + tryAgentError' (a rq conn) >>= \case + Left e -> notify' connId (ERR e) + Right () -> pure () + processSubOk :: RcvQueue -> TVar [ConnId] -> AM () + processSubOk rq@RcvQueue {connId} upConnIds = + atomically . whenM (isPendingSub connId) $ do + addSubscription c rq + modifyTVar' upConnIds (connId :) + processSubErr :: RcvQueue -> SMPClientError -> AM () + processSubErr rq@RcvQueue {connId} e = do + atomically . whenM (isPendingSub connId) $ failSubscription c rq e + lift $ notifyErr connId e + isPendingSub connId = (&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId + notify' :: forall e m. (AEntityI e, MonadIO m) => ConnId -> ACommand 'Agent e -> m () + notify' connId msg = atomically $ writeTBQueue subQ ("", connId, APC (sAEntity @e) msg) + notifyErr :: ConnId -> SMPClientError -> AM' () + notifyErr connId = notify' connId . ERR . protocolClientError SMP (B.unpack $ strEncode srv) + processSMP :: forall c. RcvQueue -> Connection c -> ConnData -> BrokerMsg -> AM () processSMP - rq@RcvQueue {e2ePrivKey, e2eDhSecret, status} + rq@RcvQueue {rcvId = rId, e2ePrivKey, e2eDhSecret, status} conn - cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss} = - withConnLock c connId "processSMP" $ case cmd of - Right r@(SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId}) -> + cData@ConnData {userId, connId, connAgentVersion, ratchetSyncState = rss} + smpMsg = + withConnLock c connId "processSMP" $ case smpMsg of + SMP.MSG msg@SMP.RcvMessage {msgId = srvMsgId} -> void . handleNotifyAck $ do - isGET <- atomically $ hasGetLock c rq - unless isGET $ checkExpiredResponse r msg' <- decryptSMPMessage rq msg ack' <- handleNotifyAck $ case msg' of SMP.ClientRcvMsgBody {msgTs = srvTs, msgFlags, msgBody} -> processClientMsg srvTs msgFlags msgBody SMP.ClientRcvMsgQuota {} -> queueDrained >> ack - when isGET $ notify (MSGNTF $ SMP.rcvMessageMeta srvMsgId msg') + whenM (atomically $ hasGetLock c rq) $ + notify (MSGNTF $ SMP.rcvMessageMeta srvMsgId msg') pure ack' where queueDrained = case conn of @@ -2259,29 +2299,23 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, ackDel aId = enqueueCmd (ICAckDel rId srvMsgId aId) $> ACKd handleNotifyAck :: AM ACKd -> AM ACKd handleNotifyAck m = m `catchAgentError` \e -> notify (ERR e) >> ack - Right SMP.END -> - atomically (TM.lookup tSess smpClients $>>= (tryReadTMVar . sessionVar) >>= processEND) - >>= logServer "<--" c srv rId + SMP.END -> + atomically (TM.lookup tSess (smpClients c) $>>= (tryReadTMVar . sessionVar) >>= processEND) + >>= notifyEnd where processEND = \case Just (Right clnt) - | sessId == sessionId (thParams $ connectedClient clnt) -> do - removeSubscription c connId - notify' END - pure "END" - | otherwise -> ignored - _ -> ignored - ignored = pure "END from disconnected client - ignored" - Right (SMP.ERR e) -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e - Right r@SMP.OK -> checkExpiredResponse r - Right r -> unexpected r - Left e -> notify $ ERR $ protocolClientError SMP (B.unpack $ strEncode srv) e + | sessId == sessionId (thParams $ connectedClient clnt) -> + removeSubscription c connId $> True + _ -> pure False + notifyEnd removed + | removed = notify END >> logServer "<--" c srv rId "END" + | otherwise = logServer "<--" c srv rId "END from disconnected client - ignored" + SMP.ERR e -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e + r -> unexpected r where - notify :: forall e m. MonadIO m => AEntityI e => ACommand 'Agent e -> m () - notify = atomically . notify' - - notify' :: forall e. AEntityI e => ACommand 'Agent e -> STM () - notify' msg = writeTBQueue subQ ("", connId, APC (sAEntity @e) msg) + notify :: forall e m. (AEntityI e, MonadIO m) => ACommand 'Agent e -> m () + notify = notify' connId prohibited :: AM () prohibited = notify . ERR $ AGENT A_PROHIBITED @@ -2291,25 +2325,10 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, unexpected :: BrokerMsg -> AM () unexpected r = do - logServer "<--" c srv rId $ "unexpected: " <> bshow cmd + logServer "<--" c srv rId $ "unexpected: " <> bshow r -- TODO add extended information about transmission type once UNEXPECTED has string notify . ERR $ BROKER (B.unpack $ strEncode srv) $ UNEXPECTED (take 32 $ show r) - checkExpiredResponse :: BrokerMsg -> AM () - checkExpiredResponse r = case tType of - TTEvent -> pure () - TTUncorrelatedResponse -> unexpected r - TTExpiredResponse (SMP.Cmd _ cmd') -> case cmd' of - SMP.SUB -> do - added <- - atomically $ - ifM - ((&&) <$> hasPendingSubscription c connId <*> activeClientSession c tSess sessId) - (True <$ addSubscription c rq) - (pure False) - when added $ notify $ UP srv [connId] - _ -> pure () - decryptClientMessage :: C.DhSecretX25519 -> SMP.ClientMsgEnvelope -> AM (SMP.PrivHeader, AgentMsgEnvelope) decryptClientMessage e2eDh SMP.ClientMsgEnvelope {cmNonce, cmEncBody} = do clientMsg <- agentCbDecrypt e2eDh cmNonce cmEncBody diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index df6105766..64f8c172f 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -41,6 +41,7 @@ module Simplex.Messaging.Agent.Client getQueueMessage, decryptSMPMessage, addSubscription, + failSubscription, addNewQueueSubscription, getSubscriptions, sendConfirmation, @@ -268,7 +269,7 @@ data AgentClient = AgentClient active :: TVar Bool, rcvQ :: TBQueue (ATransmission 'Client), subQ :: TBQueue (ATransmission 'Agent), - msgQ :: TBQueue (ServerTransmission SMPVersion ErrorType BrokerMsg), + msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg), smpServers :: TMap UserId (NonEmpty SMPServerWithAuth), smpClients :: TMap SMPTransportSession SMPClientVar, -- smpProxiedRelays: @@ -1079,7 +1080,7 @@ protocolClientError :: (Show err, Encoding err) => (HostName -> err -> AgentErro protocolClientError protocolError_ host = \case PCEProtocolError e -> protocolError_ host e PCEResponseError e -> BROKER host $ RESPONSE $ B.unpack $ smpEncode e - PCEUnexpectedResponse r -> BROKER host $ UNEXPECTED $ take 32 $ show r + PCEUnexpectedResponse e -> BROKER host $ UNEXPECTED $ B.unpack e PCEResponseTimeout -> BROKER host TIMEOUT PCENetworkError -> BROKER host NETWORK PCEIncompatibleHost -> BROKER host HOST @@ -1269,9 +1270,8 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode = do processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM () processSubResult c rq@RcvQueue {connId} = \case Left e -> - unless (temporaryClientError e) $ do - RQ.deleteQueue rq (pendingSubs c) - TM.insert (RQ.qKey rq) e (removedSubs c) + unless (temporaryClientError e) $ + failSubscription c rq e Right () -> whenM (hasPendingSubscription c connId) $ addSubscription c rq @@ -1391,6 +1391,11 @@ addSubscription c rq@RcvQueue {connId} = do RQ.addQueue rq $ activeSubs c RQ.deleteQueue rq $ pendingSubs c +failSubscription :: AgentClient -> RcvQueue -> SMPClientError -> STM () +failSubscription c rq e = do + RQ.deleteQueue rq (pendingSubs c) + TM.insert (RQ.qKey rq) e (removedSubs c) + addPendingSubscription :: AgentClient -> RcvQueue -> STM () addPendingSubscription c rq@RcvQueue {connId} = do modifyTVar' (subscrConns c) $ S.insert connId diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 97a2867ca..2e05812cb 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -65,6 +65,7 @@ module Simplex.Messaging.Client ProtocolClientError (..), SMPClientError, ProxyClientError (..), + unexpectedResponse, ProtocolClientConfig (..), NetworkConfig (..), TransportSessionMode (..), @@ -80,8 +81,8 @@ module Simplex.Messaging.Client proxyUsername, temporaryClientError, smpProxyError, - ServerTransmission, - TransmissionType (..), + ServerTransmissionBatch, + ServerTransmission (..), ClientCommand, -- * For testing @@ -155,7 +156,7 @@ data PClient v err msg = PClient sentCommands :: TMap CorrId (Request err msg), sndQ :: TBQueue ByteString, rcvQ :: TBQueue (NonEmpty (SignedTransmission err msg)), - msgQ :: Maybe (TBQueue (ServerTransmission v err msg)) + msgQ :: Maybe (TBQueue (ServerTransmissionBatch v err msg)) } smpClientStub :: TVar ChaChaDRG -> ByteString -> VersionSMP -> Maybe (THandleAuth 'TClient) -> STM SMPClient @@ -206,10 +207,14 @@ type SMPClient = ProtocolClient SMPVersion ErrorType BrokerMsg -- | Type for client command data type ClientCommand msg = (Maybe C.APrivateAuthKey, EntityId, ProtoCommand msg) --- | Type synonym for transmission from some SPM server queue. -type ServerTransmission v err msg = (TransportSession msg, Version v, SessionId, TransmissionType msg, EntityId, Either (ProtocolClientError err) msg) +-- | Type synonym for transmission from SPM servers. +-- Batch response is presented as a single `ServerTransmissionBatch` tuple. +type ServerTransmissionBatch v err msg = (TransportSession msg, Version v, SessionId, NonEmpty (EntityId, ServerTransmission err msg)) -data TransmissionType msg = TTEvent | TTUncorrelatedResponse | TTExpiredResponse (ProtoCommand msg) +data ServerTransmission err msg + = STEvent (Either (ProtocolClientError err) msg) + | STResponse (ProtoCommand msg) (Either (ProtocolClientError err) msg) + | STUnexpectedError (ProtocolClientError err) data HostMode = -- | prefer (or require) onion hosts when connecting via SOCKS proxy @@ -396,7 +401,7 @@ type TransportSession msg = (UserId, ProtoServer msg, Maybe EntityId) -- -- A single queue can be used for multiple 'SMPClient' instances, -- as 'SMPServerTransmission' includes server information. -getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> TransportSession msg -> ProtocolClientConfig v -> Maybe (TBQueue (ServerTransmission v err msg)) -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) +getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> TransportSession msg -> ProtocolClientConfig v -> Maybe (TBQueue (ServerTransmissionBatch v err msg)) -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serverVRange, agreeSecret} msgQ disconnected = do case chooseTransportHost networkConfig (host srv) of Right useHost -> @@ -498,38 +503,47 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize maxCnt = smpPingCount networkConfig process :: ProtocolClient v err msg -> IO () - process c = forever $ atomically (readTBQueue $ rcvQ $ client_ c) >>= mapM_ (processMsg c) + process c = forever $ atomically (readTBQueue $ rcvQ $ client_ c) >>= processMsgs c - processMsg :: ProtocolClient v err msg -> SignedTransmission err msg -> IO () - processMsg c@ProtocolClient {client_ = PClient {sentCommands}} (_, _, (corrId, entId, respOrErr)) - | not $ B.null $ bs corrId = + processMsgs :: ProtocolClient v err msg -> NonEmpty (SignedTransmission err msg) -> IO () + processMsgs c ts = do + tsVar <- newTVarIO [] + mapM_ (processMsg c tsVar) ts + ts' <- readTVarIO tsVar + forM_ msgQ $ \q -> + mapM_ (atomically . writeTBQueue q . serverTransmission c) (L.nonEmpty ts') + + processMsg :: ProtocolClient v err msg -> TVar [(EntityId, ServerTransmission err msg)] -> SignedTransmission err msg -> IO () + processMsg ProtocolClient {client_ = PClient {sentCommands}} tsVar (_, _, (corrId, entId, respOrErr)) + | B.null $ bs corrId = sendMsg $ STEvent clientResp + | otherwise = atomically (TM.lookup corrId sentCommands) >>= \case - Nothing -> sendMsg TTUncorrelatedResponse + Nothing -> sendMsg $ STUnexpectedError unexpected Just Request {entityId, command, pending, responseVar} -> do wasPending <- atomically $ do TM.delete corrId sentCommands ifM (swapTVar pending False) - (True <$ tryPutTMVar responseVar (response entityId)) + (True <$ tryPutTMVar responseVar (if entityId == entId then clientResp else Left unexpected)) (pure False) - unless wasPending $ sendMsg $ if entityId == entId then TTExpiredResponse command else TTUncorrelatedResponse - | otherwise = sendMsg TTEvent + unless wasPending $ sendMsg $ if entityId == entId then STResponse command clientResp else STUnexpectedError unexpected where - response entityId - | entityId == entId = clientResp - | otherwise = Left . PCEUnexpectedResponse $ bshow respOrErr + unexpected = unexpectedResponse respOrErr clientResp = case respOrErr of Left e -> Left $ PCEResponseError e Right r -> case protocolError r of Just e -> Left $ PCEProtocolError e _ -> Right r - sendMsg :: TransmissionType msg -> IO () - sendMsg tType = case msgQ of - Just q -> atomically $ writeTBQueue q $ serverTransmission c tType entId clientResp + sendMsg :: ServerTransmission err msg -> IO () + sendMsg t = case msgQ of + Just _ -> atomically $ modifyTVar' tsVar ((entId, t) :) Nothing -> case clientResp of - Left e -> logError $ "SMP client error: " <> tshow e - Right _ -> logWarn $ "SMP client unprocessed event" + Left e -> logError ("SMP client error: " <> tshow e) + Right _ -> logWarn ("SMP client unprocessed event") + +unexpectedResponse :: Show r => r -> ProtocolClientError err +unexpectedResponse = PCEUnexpectedResponse . B.pack . take 32 . show proxyUsername :: TransportSession msg -> ByteString proxyUsername (userId, _, entityId_) = C.sha256Hash $ bshow userId <> maybe "" (":" <>) entityId_ @@ -585,7 +599,7 @@ smpProxyError :: SMPClientError -> ErrorType smpProxyError = \case PCEProtocolError e -> PROXY $ PROTOCOL e PCEResponseError e -> PROXY $ BROKER $ RESPONSE $ B.unpack $ strEncode e - PCEUnexpectedResponse s -> PROXY $ BROKER $ UNEXPECTED $ B.unpack $ B.take 32 s + PCEUnexpectedResponse e -> PROXY $ BROKER $ UNEXPECTED $ B.unpack e PCEResponseTimeout -> PROXY $ BROKER TIMEOUT PCENetworkError -> PROXY $ BROKER NETWORK PCEIncompatibleHost -> PROXY $ BROKER HOST @@ -606,7 +620,7 @@ createSMPQueue :: createSMPQueue c (rKey, rpKey) dhKey auth subMode = sendSMPCommand c (Just rpKey) "" (NEW rKey dhKey auth subMode) >>= \case IDS qik -> pure qik - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Subscribe to the SMP queue. -- @@ -617,7 +631,7 @@ subscribeSMPQueue c@ProtocolClient {client_ = PClient {sendPings}} rpKey rId = d sendSMPCommand c (Just rpKey) rId SUB >>= \case OK -> pure () cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Subscribe to multiple SMP queues batching commands if supported. subscribeSMPQueues :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId) -> IO (NonEmpty (Either SMPClientError ())) @@ -637,15 +651,15 @@ processSUBResponse :: SMPClient -> Response ErrorType BrokerMsg -> IO (Either SM processSUBResponse c (Response rId r) = case r of Right OK -> pure $ Right () Right cmd@MSG {} -> writeSMPMessage c rId cmd $> Right () - Right r' -> pure . Left . PCEUnexpectedResponse $ bshow r' + Right r' -> pure . Left $ unexpectedResponse r' Left e -> pure $ Left e writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> IO () -writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c TTEvent rId (Right msg)) (msgQ $ client_ c) +writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c [(rId, STEvent (Right msg))]) (msgQ $ client_ c) -serverTransmission :: ProtocolClient v err msg -> TransmissionType msg -> RecipientId -> Either (ProtocolClientError err) msg -> ServerTransmission v err msg -serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionId}, client_ = PClient {transportSession}} tType entityId msgOrErr = - (transportSession, thVersion, sessionId, tType, entityId, msgOrErr) +serverTransmission :: ProtocolClient v err msg -> NonEmpty (RecipientId, ServerTransmission err msg) -> ServerTransmissionBatch v err msg +serverTransmission ProtocolClient {thParams = THandleParams {thVersion, sessionId}, client_ = PClient {transportSession}} ts = + (transportSession, thVersion, sessionId, ts) -- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue -- @@ -655,7 +669,7 @@ getSMPMessage c rpKey rId = sendSMPCommand c (Just rpKey) rId GET >>= \case OK -> pure Nothing cmd@(MSG msg) -> liftIO (writeSMPMessage c rId cmd) $> Just msg - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Subscribe to the SMP queue notifications. -- @@ -683,7 +697,7 @@ enableSMPQueueNotifications :: SMPClient -> RcvPrivateAuthKey -> RecipientId -> enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey = sendSMPCommand c (Just rpKey) rId (NKEY notifierKey rcvNtfPublicDhKey) >>= \case NID nId rcvNtfSrvPublicDhKey -> pure (nId, rcvNtfSrvPublicDhKey) - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Enable notifications for the multiple queues for push notifications server. enableSMPQueuesNtfs :: SMPClient -> NonEmpty (RcvPrivateAuthKey, RecipientId, NtfPublicAuthKey, RcvNtfPublicDhKey) -> IO (NonEmpty (Either SMPClientError (NotifierId, RcvNtfPublicDhKey))) @@ -692,7 +706,7 @@ enableSMPQueuesNtfs c qs = L.map process <$> sendProtocolCommands c cs cs = L.map (\(rpKey, rId, notifierKey, rcvNtfPublicDhKey) -> (Just rpKey, rId, Cmd SRecipient $ NKEY notifierKey rcvNtfPublicDhKey)) qs process (Response _ r) = case r of Right (NID nId rcvNtfSrvPublicDhKey) -> Right (nId, rcvNtfSrvPublicDhKey) - Right r' -> Left . PCEUnexpectedResponse $ bshow r' + Right r' -> Left $ unexpectedResponse r' Left e -> Left e -- | Disable notifications for the queue for push notifications server. @@ -714,7 +728,7 @@ sendSMPMessage :: SMPClient -> Maybe SndPrivateAuthKey -> SenderId -> MsgFlags - sendSMPMessage c spKey sId flags msg = sendSMPCommand c spKey sId (SEND flags msg) >>= \case OK -> pure () - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Acknowledge message delivery (server deletes the message). -- @@ -724,7 +738,7 @@ ackSMPMessage c rpKey rId msgId = sendSMPCommand c (Just rpKey) rId (ACK msgId) >>= \case OK -> return () cmd@MSG {} -> liftIO $ writeSMPMessage c rId cmd - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r -- | Irreversibly suspend SMP queue. -- The existing messages from the queue will still be delivered. @@ -756,7 +770,7 @@ connectSMPProxiedRelay c@ProtocolClient {client_ = PClient {tcpConnectTimeout, t case supportedClientSMPRelayVRange `compatibleVersion` vr of Nothing -> throwE $ transportErr TEVersion Just (Compatible v) -> liftEitherWith (const $ transportErr $ TEHandshake IDENTITY) $ ProxiedRelay sId v <$> validateRelay chain key - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r | otherwise = throwE $ PCETransportError TEVersion where tOut = Just $ tcpConnectTimeout + tcpTimeout @@ -862,14 +876,14 @@ proxySMPMessage c@ProtocolClient {thParams = proxyThParams, client_ = PClient {c (_auth, _signed, (_c, _e, cmd)) -> case cmd of Right OK -> pure $ Right () Right (ERR e) -> throwE $ PCEProtocolError e -- this is the error from the destination relay - Right e -> throwE $ PCEUnexpectedResponse $ B.take 32 $ bshow e + Right r' -> throwE $ unexpectedResponse r' Left e -> throwE $ PCEResponseError e _ -> throwE $ PCETransportError TEBadBlock ERR e -> pure . Left $ ProxyProtocolError e -- this will not happen, this error is returned via Left _ -> pure . Left $ ProxyUnexpectedResponse $ take 32 $ show r Left e -> case e of PCEProtocolError e' -> pure . Left $ ProxyProtocolError e' - PCEUnexpectedResponse r -> pure . Left $ ProxyUnexpectedResponse $ B.unpack r + PCEUnexpectedResponse e' -> pure . Left $ ProxyUnexpectedResponse $ B.unpack e' PCEResponseError e' -> pure . Left $ ProxyResponseError e' _ -> throwE e @@ -894,13 +908,13 @@ forwardSMPMessage c@ProtocolClient {thParams, client_ = PClient {clientCorrId = r' <- liftEitherWith PCECryptoError $ C.cbDecryptNoPad sessSecret (C.reverseNonce nonce) efr FwdResponse {fwdCorrId = _, fwdResponse} <- liftEitherWith (const $ PCEResponseError BLOCK) $ smpDecode r' pure fwdResponse - r -> throwE . PCEUnexpectedResponse $ B.take 32 $ bshow r + r -> throwE $ unexpectedResponse r okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateAuthKey -> QueueId -> ExceptT SMPClientError IO () okSMPCommand cmd c pKey qId = sendSMPCommand c (Just pKey) qId cmd >>= \case OK -> return () - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r okSMPCommands :: PartyI p => Command p -> SMPClient -> NonEmpty (C.APrivateAuthKey, QueueId) -> IO (NonEmpty (Either SMPClientError ())) okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs @@ -909,7 +923,7 @@ okSMPCommands cmd c qs = L.map process <$> sendProtocolCommands c cs cs = L.map (\(pKey, qId) -> (Just pKey, qId, aCmd)) qs process (Response _ r) = case r of Right OK -> Right () - Right r' -> Left . PCEUnexpectedResponse $ bshow r' + Right r' -> Left $ unexpectedResponse r' Left e -> Left e -- | Send SMP command diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index e08f7b2c9..10b83157f 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -99,7 +99,7 @@ defaultSMPClientAgentConfig = data SMPClientAgent = SMPClientAgent { agentCfg :: SMPClientAgentConfig, active :: TVar Bool, - msgQ :: TBQueue (ServerTransmission SMPVersion ErrorType BrokerMsg), + msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg), agentQ :: TBQueue SMPClientAgentEvent, randomDrg :: TVar ChaChaDRG, smpClients :: TMap SMPServer SMPClientVar, diff --git a/src/Simplex/Messaging/Notifications/Client.hs b/src/Simplex/Messaging/Notifications/Client.hs index cc698b344..32d92faf3 100644 --- a/src/Simplex/Messaging/Notifications/Client.hs +++ b/src/Simplex/Messaging/Notifications/Client.hs @@ -12,7 +12,6 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol import Simplex.Messaging.Notifications.Transport (NTFVersion, supportedClientNTFVRange, supportedNTFHandshakes) import Simplex.Messaging.Protocol (ErrorType) -import Simplex.Messaging.Util (bshow) type NtfClient = ProtocolClient NTFVersion ErrorType NtfResponse @@ -25,7 +24,7 @@ ntfRegisterToken :: NtfClient -> C.APrivateAuthKey -> NewNtfEntity 'Token -> Exc ntfRegisterToken c pKey newTkn = sendNtfCommand c (Just pKey) "" (TNEW newTkn) >>= \case NRTknId tknId dhKey -> pure (tknId, dhKey) - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r ntfVerifyToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> NtfRegCode -> ExceptT NtfClientError IO () ntfVerifyToken c pKey tknId code = okNtfCommand (TVFY code) c pKey tknId @@ -34,7 +33,7 @@ ntfCheckToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> ExceptT NtfClie ntfCheckToken c pKey tknId = sendNtfCommand c (Just pKey) tknId TCHK >>= \case NRTkn stat -> pure stat - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r ntfReplaceToken :: NtfClient -> C.APrivateAuthKey -> NtfTokenId -> DeviceToken -> ExceptT NtfClientError IO () ntfReplaceToken c pKey tknId token = okNtfCommand (TRPL token) c pKey tknId @@ -49,13 +48,13 @@ ntfCreateSubscription :: NtfClient -> C.APrivateAuthKey -> NewNtfEntity 'Subscri ntfCreateSubscription c pKey newSub = sendNtfCommand c (Just pKey) "" (SNEW newSub) >>= \case NRSubId subId -> pure subId - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r ntfCheckSubscription :: NtfClient -> C.APrivateAuthKey -> NtfSubscriptionId -> ExceptT NtfClientError IO NtfSubStatus ntfCheckSubscription c pKey subId = sendNtfCommand c (Just pKey) subId SCHK >>= \case NRSub stat -> pure stat - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r ntfDeleteSubscription :: NtfClient -> C.APrivateAuthKey -> NtfSubscriptionId -> ExceptT NtfClientError IO () ntfDeleteSubscription = okNtfCommand SDEL @@ -68,4 +67,4 @@ okNtfCommand :: NtfEntityI e => NtfCommand e -> NtfClient -> C.APrivateAuthKey - okNtfCommand cmd c pKey entId = sendNtfCommand c (Just pKey) entId cmd >>= \case NROk -> return () - r -> throwE . PCEUnexpectedResponse $ bshow r + r -> throwE $ unexpectedResponse r diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 5e4cf839e..892560660 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -31,7 +31,7 @@ import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Data.Time.Format.ISO8601 (iso8601Show) import Network.Socket (ServiceName) -import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError) +import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmission (..)) import Simplex.Messaging.Client.Agent import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String @@ -220,23 +220,27 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge receiveSMP :: M () receiveSMP = forever $ do - ((_, srv, _), _, _, _tType, ntfId, msgOrErr) <- atomically $ readTBQueue msgQ - let smpQueue = SMPQueueNtf srv ntfId - case msgOrErr of - Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do - ntfTs <- liftIO getSystemTime - st <- asks store - NtfPushServer {pushQ} <- asks pushServer - stats <- asks serverStats - atomically $ updatePeriodStats (activeSubs stats) ntfId - atomically $ - findNtfSubscriptionToken st smpQueue - >>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta})) - incNtfStat ntfReceived - Right SMP.END -> updateSubStatus smpQueue NSEnd - Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e - Right _ -> logError $ "SMP server unexpected response" - Left e -> logError $ "SMP client error: " <> tshow e + ((_, srv, _), _, _, ts) <- atomically $ readTBQueue msgQ + forM ts $ \(ntfId, t) -> case t of + STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen + STResponse {} -> pure () -- it was already reported as timeout error + STEvent msgOrErr -> do + let smpQueue = SMPQueueNtf srv ntfId + case msgOrErr of + Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do + ntfTs <- liftIO getSystemTime + st <- asks store + NtfPushServer {pushQ} <- asks pushServer + stats <- asks serverStats + atomically $ updatePeriodStats (activeSubs stats) ntfId + atomically $ + findNtfSubscriptionToken st smpQueue + >>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta})) + incNtfStat ntfReceived + Right SMP.END -> updateSubStatus smpQueue NSEnd + Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e + Right _ -> logError $ "SMP server unexpected response" + Left e -> logError $ "SMP client error: " <> tshow e receiveAgent = forever $ @@ -248,7 +252,6 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge forM_ subs $ \(_, ntfId) -> do let smpQueue = SMPQueueNtf srv ntfId updateSubStatus smpQueue NSInactive - CAResubscribed srv subs -> do forM_ subs $ \(_, ntfId) -> updateSubStatus (SMPQueueNtf srv ntfId) NSActive logSubStatus srv "resubscribed" $ length subs diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index 467700784..1c458a062 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -120,7 +120,7 @@ deliverMessageViaProxy proxyServ relayServ alg msg msg' = do -- send via proxy to unsecured queue Right () <- proxySMPMessage pc sess Nothing sndId noMsgFlags msg -- receive 1 - (_tSess, _v, _sid, _isResp, _entId, Right (SMP.MSG RcvMessage {msgId, msgBody = EncRcvMsgBody encBody})) <- atomically $ readTBQueue msgQ + (_tSess, _v, _sid, [(_entId, STEvent (Right (SMP.MSG RcvMessage {msgId, msgBody = EncRcvMsgBody encBody})))]) <- atomically $ readTBQueue msgQ liftIO $ dec msgId encBody `shouldBe` Right msg ackSMPMessage rc rPriv rcvId msgId -- secure queue @@ -129,7 +129,7 @@ deliverMessageViaProxy proxyServ relayServ alg msg msg' = do -- send via proxy to secured queue Right () <- proxySMPMessage pc sess (Just sPriv) sndId noMsgFlags msg' -- receive 2 - (_tSess, _v, _sid, _isResp, _entId, Right (SMP.MSG RcvMessage {msgId = msgId', msgBody = EncRcvMsgBody encBody'})) <- atomically $ readTBQueue msgQ + (_tSess, _v, _sid, [(_entId, STEvent (Right (SMP.MSG RcvMessage {msgId = msgId', msgBody = EncRcvMsgBody encBody'})))]) <- atomically $ readTBQueue msgQ liftIO $ dec msgId' encBody' `shouldBe` Right msg' ackSMPMessage rc rPriv rcvId msgId'