From 6b6ea78eac049ed589ceb8debdfdb18059ddb375 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Fri, 1 Jul 2022 12:25:08 +0100 Subject: [PATCH] ntf server error handling (#448) * ntf server error handling * refactor * fix --- src/Simplex/Messaging/Agent/Client.hs | 2 +- .../Messaging/Agent/NtfSubSupervisor.hs | 2 +- src/Simplex/Messaging/Client.hs | 18 ++--- src/Simplex/Messaging/Notifications/Client.hs | 11 ++-- .../Messaging/Notifications/Protocol.hs | 16 ++--- src/Simplex/Messaging/Notifications/Server.hs | 66 +++++++++---------- src/Simplex/Messaging/Protocol.hs | 2 +- 7 files changed, 57 insertions(+), 60 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 7f7038be7..9dad6b6f9 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -422,7 +422,7 @@ protocolClientError :: (ErrorType -> AgentErrorType) -> ProtocolClientError -> A protocolClientError protocolError_ = \case PCEProtocolError e -> protocolError_ e PCEResponseError e -> BROKER $ RESPONSE e - PCEUnexpectedResponse -> BROKER UNEXPECTED + PCEUnexpectedResponse _ -> BROKER UNEXPECTED PCEResponseTimeout -> BROKER TIMEOUT PCENetworkError -> BROKER NETWORK PCETransportError e -> BROKER $ TRANSPORT e diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 7073448fc..aa75dac01 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -197,7 +197,7 @@ runNtfWorker c srv doWork = forever $ do case ntfSubId of Just nSubId -> agentNtfCheckSubscription c nSubId tkn >>= \case - NSSMPAuth -> do + NSAuth -> do getNtfServer c >>= \case Just ntfServer -> do withStore' c $ \db -> diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index cb32ba3be..49f492611 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -224,7 +224,7 @@ getProtocolClient protocolServer cfg@ProtocolClientConfig {qSize, tcpTimeout, tc Right r -> case protocolError r of Just e -> Left $ PCEProtocolError e _ -> Right r - else Left PCEUnexpectedResponse + else Left . PCEUnexpectedResponse $ bshow respOrErr where sendMsg :: QueueId -> Either ErrorType msg -> IO () sendMsg qId = \case @@ -248,7 +248,7 @@ data ProtocolClientError -- e.g. server should respond `IDS` or `ERR` to `NEW` command, -- other responses would result in this error. -- Forwarded to the agent client as `ERR BROKER UNEXPECTED`. - PCEUnexpectedResponse + PCEUnexpectedResponse ByteString | -- | Used for TCP connection and command response timeouts. -- Forwarded to the agent client as `ERR BROKER TIMEOUT`. PCEResponseTimeout @@ -276,7 +276,7 @@ createSMPQueue :: createSMPQueue c rpKey rKey dhKey = sendSMPCommand c (Just rpKey) "" (NEW rKey dhKey) >>= \case IDS qik -> pure qik - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r -- | Subscribe to the SMP queue. -- @@ -287,7 +287,7 @@ subscribeSMPQueue c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId = OK -> return () cmd@MSG {} -> lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r -- | Get message from SMP queue. The server returns ERR PROHIBITED if a client uses SUB and GET via the same transport connection for the same queue -- @@ -299,7 +299,7 @@ getSMPMessage c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId = cmd@(MSG msgId msgTs msgFlags _) -> do lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ pure $ Just SMP.SMPMsgMeta {msgId, msgTs, msgFlags} - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r -- | Subscribe to the SMP queue notifications. -- @@ -320,7 +320,7 @@ enableSMPQueueNotifications :: SMPClient -> RcvPrivateSignKey -> RecipientId -> enableSMPQueueNotifications c rpKey rId notifierKey rcvNtfPublicDhKey = sendSMPCommand c (Just rpKey) rId (NKEY notifierKey rcvNtfPublicDhKey) >>= \case NID nId rcvNtfSrvPublicDhKey -> pure (nId, rcvNtfSrvPublicDhKey) - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r -- | Disable notifications for the queue for push notifications server. -- @@ -335,7 +335,7 @@ sendSMPMessage :: SMPClient -> Maybe SndPrivateSignKey -> SenderId -> MsgFlags - sendSMPMessage c spKey sId flags msg = sendSMPCommand c spKey sId (SEND flags msg) >>= \case OK -> pure () - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r -- | Acknowledge message delivery (server deletes the message). -- @@ -346,7 +346,7 @@ ackSMPMessage c@ProtocolClient {protocolServer, sessionId, msgQ} rpKey rId msgId OK -> return () cmd@MSG {} -> lift . atomically $ mapM_ (`writeTBQueue` (protocolServer, sessionId, rId, cmd)) msgQ - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r -- | Irreversibly suspend SMP queue. -- The existing messages from the queue will still be delivered. @@ -365,7 +365,7 @@ okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateSignKey -> Queue okSMPCommand cmd c pKey qId = sendSMPCommand c (Just pKey) qId cmd >>= \case OK -> return () - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r -- | Send SMP command sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateSignKey -> QueueId -> Command p -> ExceptT ProtocolClientError IO BrokerMsg diff --git a/src/Simplex/Messaging/Notifications/Client.hs b/src/Simplex/Messaging/Notifications/Client.hs index 51487a618..8f690e955 100644 --- a/src/Simplex/Messaging/Notifications/Client.hs +++ b/src/Simplex/Messaging/Notifications/Client.hs @@ -10,6 +10,7 @@ import Data.Word (Word16) import Simplex.Messaging.Client import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol +import Simplex.Messaging.Util (bshow) type NtfClient = ProtocolClient NtfResponse @@ -17,7 +18,7 @@ ntfRegisterToken :: NtfClient -> C.APrivateSignKey -> NewNtfEntity 'Token -> Exc ntfRegisterToken c pKey newTkn = sendNtfCommand c (Just pKey) "" (TNEW newTkn) >>= \case NRTknId tknId dhKey -> pure (tknId, dhKey) - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r ntfVerifyToken :: NtfClient -> C.APrivateSignKey -> NtfTokenId -> NtfRegCode -> ExceptT ProtocolClientError IO () ntfVerifyToken c pKey tknId code = okNtfCommand (TVFY code) c pKey tknId @@ -26,7 +27,7 @@ ntfCheckToken :: NtfClient -> C.APrivateSignKey -> NtfTokenId -> ExceptT Protoco ntfCheckToken c pKey tknId = sendNtfCommand c (Just pKey) tknId TCHK >>= \case NRTkn stat -> pure stat - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r ntfReplaceToken :: NtfClient -> C.APrivateSignKey -> NtfTokenId -> DeviceToken -> ExceptT ProtocolClientError IO () ntfReplaceToken c pKey tknId token = okNtfCommand (TRPL token) c pKey tknId @@ -41,13 +42,13 @@ ntfCreateSubscription :: NtfClient -> C.APrivateSignKey -> NewNtfEntity 'Subscri ntfCreateSubscription c pKey newSub = sendNtfCommand c (Just pKey) "" (SNEW newSub) >>= \case NRSubId subId -> pure subId - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r ntfCheckSubscription :: NtfClient -> C.APrivateSignKey -> NtfSubscriptionId -> ExceptT ProtocolClientError IO NtfSubStatus ntfCheckSubscription c pKey subId = sendNtfCommand c (Just pKey) subId SCHK >>= \case NRSub stat -> pure stat - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r ntfDeleteSubscription :: NtfClient -> C.APrivateSignKey -> NtfSubscriptionId -> ExceptT ProtocolClientError IO () ntfDeleteSubscription = okNtfCommand SDEL @@ -60,4 +61,4 @@ okNtfCommand :: NtfEntityI e => NtfCommand e -> NtfClient -> C.APrivateSignKey - okNtfCommand cmd c pKey entId = sendNtfCommand c (Just pKey) entId cmd >>= \case NROk -> return () - _ -> throwE PCEUnexpectedResponse + r -> throwE . PCEUnexpectedResponse $ bshow r diff --git a/src/Simplex/Messaging/Notifications/Protocol.hs b/src/Simplex/Messaging/Notifications/Protocol.hs index e025cf7e6..9b8e55184 100644 --- a/src/Simplex/Messaging/Notifications/Protocol.hs +++ b/src/Simplex/Messaging/Notifications/Protocol.hs @@ -403,12 +403,12 @@ data NtfSubStatus NSActive | -- | disconnected/unsubscribed from SMP server NSInactive - | -- | NEND received (we currently do not support it) + | -- | END received NSEnd | -- | SMP AUTH error - NSSMPAuth + NSAuth | -- | SMP error other than AUTH - NSSMPErr ErrorType + NSErr ByteString deriving (Eq, Show) instance Encoding NtfSubStatus where @@ -418,8 +418,8 @@ instance Encoding NtfSubStatus where NSActive -> "ACTIVE" NSInactive -> "INACTIVE" NSEnd -> "END" - NSSMPAuth -> "SMP_AUTH" - NSSMPErr err -> "SMP_ERR " <> smpEncode err + NSAuth -> "AUTH" + NSErr err -> "ERR " <> err smpP = A.takeTill (== ' ') >>= \case "NEW" -> pure NSNew @@ -427,10 +427,8 @@ instance Encoding NtfSubStatus where "ACTIVE" -> pure NSActive "INACTIVE" -> pure NSInactive "END" -> pure NSEnd - "SMP_AUTH" -> pure NSSMPAuth - "SMP_ERR" -> do - _ <- A.space - NSSMPErr <$> smpP + "AUTH" -> pure NSAuth + "ERR" -> NSErr <$> (A.space *> A.takeByteString) _ -> fail "bad NtfSubStatus" instance StrEncoding NtfSubStatus where diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index bbef83255..ffb475988 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -111,27 +111,20 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge runSMPSubscriber :: SMPSubscriber -> m () runSMPSubscriber SMPSubscriber {newSubQ = subscriberSubQ} = forever $ - atomically (peekTQueue subscriberSubQ) >>= \case - NtfSub NtfSubData {smpQueue, notifierKey} -> do + atomically (peekTQueue subscriberSubQ) + >>= \(NtfSub NtfSubData {smpQueue, notifierKey}) -> do updateSubStatus smpQueue NSPending let SMPQueueNtf {smpServer, notifierId} = smpQueue liftIO (runExceptT $ subscribeQueue ca smpServer ((SPNotifier, notifierId), notifierKey)) >>= \case - Right _ -> update smpQueue NSActive - Left err -> case err of - PCEProtocolError AUTH -> update smpQueue NSSMPAuth - PCEProtocolError e -> update smpQueue $ NSSMPErr e - PCEIOError e -> log' $ "IOError " <> T.pack (show e) - PCEResponseError e -> log' $ "ResponseError " <> T.pack (show e) - PCEUnexpectedResponse -> log' "UnexpectedResponse" - PCETransportError e -> log' $ "TransportError " <> T.pack (show e) - PCESignatureError e -> log' $ "SignatureError " <> T.pack (show e) - PCEResponseTimeout -> pure () - PCENetworkError -> pure () - where - update smpQueue status = do - updateSubStatus smpQueue status - void . atomically $ readTQueue subscriberSubQ - log' e = logError $ "SMPSubscriber subscribeQueue " <> e + Right _ -> do + updateSubStatus smpQueue NSActive + void . atomically $ readTQueue subscriberSubQ + Left err -> do + handleSubError smpQueue err + case err of + PCEResponseTimeout -> pure () + PCENetworkError -> pure () + _ -> void . atomically $ readTQueue subscriberSubQ receiveSMP :: m () receiveSMP = forever $ do @@ -145,7 +138,7 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge atomically $ findNtfSubscriptionToken st smpQueue >>= mapM_ (\tkn -> writeTBQueue pushQ (tkn, PNMessage PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta})) - SMP.END -> updateSubStatus smpQueue NSInactive + SMP.END -> updateSubStatus smpQueue NSEnd _ -> pure () pure () @@ -162,20 +155,23 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge let ntfId = snd sub smpQueue = SMPQueueNtf srv ntfId updateSubStatus smpQueue NSActive - CASubError srv sub err -> do - let ntfId = snd sub - smpQueue = SMPQueueNtf srv ntfId - case err of - PCEProtocolError e -> case e of - AUTH -> updateSubStatus smpQueue NSSMPAuth - _ -> updateSubStatus smpQueue (NSSMPErr e) - PCEResponseError e -> logErr e - PCEUnexpectedResponse -> logErr err - PCESignatureError e -> logErr e - PCEIOError e -> logErr e - _ -> pure () - where - logErr e = logError $ "ntfSubscriber receiveAgent error: " <> T.pack (show e) + CASubError srv (_, ntfId) err -> + handleSubError (SMPQueueNtf srv ntfId) err + + handleSubError :: SMPQueueNtf -> ProtocolClientError -> m () + handleSubError smpQueue = \case + PCEProtocolError AUTH -> updateSubStatus smpQueue NSAuth + PCEProtocolError e -> updateErr "SMP error " e + PCEIOError e -> updateErr "IOError " e + PCEResponseError e -> updateErr "ResponseError " e + PCEUnexpectedResponse r -> updateErr "UnexpectedResponse " r + PCETransportError e -> updateErr "TransportError " e + PCESignatureError e -> updateErr "SignatureError " e + PCEResponseTimeout -> pure () + PCENetworkError -> pure () + where + updateErr :: Show e => ByteString -> e -> m () + updateErr errType e = updateSubStatus smpQueue . NSErr $ errType <> bshow e updateSubStatus smpQueue status = do st <- asks store @@ -370,7 +366,9 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu | otherwise -> do logDebug "TVFY - incorrect code or token status" pure $ NRErr AUTH - TCHK -> pure $ NRTkn status + TCHK -> do + logDebug "TCHK" + pure $ NRTkn status TRPL token' -> do logDebug "TRPL - replace token" st <- asks store diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 6bda98677..029e8eeb3 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -628,7 +628,7 @@ transmissionP = do command <- A.takeByteString pure RawTransmission {signature, signed, sessId, corrId, entityId, command} -class (ProtocolEncoding msg, ProtocolEncoding (ProtocolCommand msg)) => Protocol msg where +class (ProtocolEncoding msg, ProtocolEncoding (ProtocolCommand msg), Show msg) => Protocol msg where type ProtocolCommand msg = cmd | cmd -> msg protocolClientHandshake :: forall c. Transport c => c -> C.KeyHash -> VersionRange -> ExceptT TransportError IO (THandle c) protocolPing :: ProtocolCommand msg