diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index e3f0858e0..854826288 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -82,6 +82,7 @@ data SMPClient = SMPClient sessionId :: ByteString, smpServer :: SMPServer, tcpTimeout :: Int, + smpPingFailures :: TVar Int, clientCorrId :: TVar Natural, sentCommands :: TMap CorrId Request, sndQ :: TBQueue SentRawTransmission, @@ -103,7 +104,11 @@ data SMPClientConfig = SMPClientConfig -- | TCP keep-alive options, Nothing to skip enabling keep-alive tcpKeepAlive :: Maybe KeepAliveOpts, -- | period for SMP ping commands (microseconds) - smpPing :: Int + smpPing :: Int, + -- | timpeout for SMP pings (microseconds) + smpPingTimeout :: Int, + -- | failed pings count + smpPingFailLimit :: Int } -- | Default SMP client configuration. @@ -114,7 +119,9 @@ smpDefaultConfig = defaultTransport = ("5223", transport @TLS), tcpTimeout = 5_000_000, tcpKeepAlive = Just defaultKeepAliveOpts, - smpPing = 300_000_000 -- 5 min + smpPing = 300_000_000, -- 5 min, + smpPingTimeout = 10_000_000, + smpPingFailLimit = 3 } data Request = Request @@ -130,12 +137,13 @@ type Response = Either SMPClientError BrokerMsg -- A single queue can be used for multiple 'SMPClient' instances, -- as 'SMPServerTransmission' includes server information. getSMPClient :: SMPServer -> SMPClientConfig -> TBQueue SMPServerTransmission -> IO () -> IO (Either SMPClientError SMPClient) -getSMPClient smpServer cfg@SMPClientConfig {qSize, tcpTimeout, tcpKeepAlive, smpPing} msgQ disconnected = +getSMPClient smpServer cfg@SMPClientConfig {qSize, tcpTimeout, smpPingTimeout, tcpKeepAlive, smpPing, smpPingFailLimit} msgQ disconnected = atomically mkSMPClient >>= runClient useTransport where mkSMPClient :: STM SMPClient mkSMPClient = do connected <- newTVar False + smpPingFailures <- newTVar smpPingFailLimit clientCorrId <- newTVar 0 sentCommands <- TM.empty sndQ <- newTBQueue qSize @@ -147,6 +155,7 @@ getSMPClient smpServer cfg@SMPClientConfig {qSize, tcpTimeout, tcpKeepAlive, smp connected, smpServer, tcpTimeout, + smpPingFailures, clientCorrId, sentCommands, sndQ, @@ -192,9 +201,13 @@ getSMPClient smpServer cfg@SMPClientConfig {qSize, tcpTimeout, tcpKeepAlive, smp receive SMPClient {rcvQ} h = forever $ tGet h >>= atomically . writeTBQueue rcvQ ping :: SMPClient -> IO () - ping c = forever $ do + ping c@SMPClient {smpPingFailures} = forever $ do threadDelay smpPing - void . either throwIO pure =<< runExceptT (sendSMPCommand c Nothing "" PING) + runExceptT (sendSMPCommand c Nothing "" PING $ Just smpPingTimeout) >>= \case + Right _ -> atomically $ writeTVar smpPingFailures smpPingFailLimit + Left e -> do + n <- atomically $ stateTVar smpPingFailures $ \n -> (n - 1, n - 1) + when (n == 0) $ throwIO e process :: SMPClient -> IO () process SMPClient {rcvQ, sentCommands} = forever $ do @@ -260,7 +273,7 @@ createSMPQueue :: RcvPublicDhKey -> ExceptT SMPClientError IO QueueIdsKeys createSMPQueue c rpKey rKey dhKey = - sendSMPCommand c (Just rpKey) "" (NEW rKey dhKey) >>= \case + sendSMPCommand c (Just rpKey) "" (NEW rKey dhKey) Nothing >>= \case IDS qik -> pure qik _ -> throwE SMPUnexpectedResponse @@ -269,7 +282,7 @@ createSMPQueue c rpKey rKey dhKey = -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#subscribe-to-queue subscribeSMPQueue :: SMPClient -> RcvPrivateSignKey -> RecipientId -> ExceptT SMPClientError IO () subscribeSMPQueue c@SMPClient {smpServer, msgQ} rpKey rId = - sendSMPCommand c (Just rpKey) rId SUB >>= \case + sendSMPCommand c (Just rpKey) rId SUB Nothing >>= \case OK -> return () cmd@MSG {} -> lift . atomically $ writeTBQueue msgQ (smpServer, rId, cmd) @@ -292,7 +305,7 @@ secureSMPQueue c rpKey rId senderKey = okSMPCommand (KEY senderKey) c rpKey rId -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#enable-notifications-command enableSMPQueueNotifications :: SMPClient -> RcvPrivateSignKey -> RecipientId -> NtfPublicVerifyKey -> ExceptT SMPClientError IO NotifierId enableSMPQueueNotifications c rpKey rId notifierKey = - sendSMPCommand c (Just rpKey) rId (NKEY notifierKey) >>= \case + sendSMPCommand c (Just rpKey) rId (NKEY notifierKey) Nothing >>= \case NID nId -> pure nId _ -> throwE SMPUnexpectedResponse @@ -301,7 +314,7 @@ enableSMPQueueNotifications c rpKey rId notifierKey = -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#send-message sendSMPMessage :: SMPClient -> Maybe SndPrivateSignKey -> SenderId -> MsgBody -> ExceptT SMPClientError IO () sendSMPMessage c spKey sId msg = - sendSMPCommand c spKey sId (SEND msg) >>= \case + sendSMPCommand c spKey sId (SEND msg) Nothing >>= \case OK -> pure () _ -> throwE SMPUnexpectedResponse @@ -310,7 +323,7 @@ sendSMPMessage c spKey sId msg = -- https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#acknowledge-message-delivery ackSMPMessage :: SMPClient -> RcvPrivateSignKey -> QueueId -> ExceptT SMPClientError IO () ackSMPMessage c@SMPClient {smpServer, msgQ} rpKey rId = - sendSMPCommand c (Just rpKey) rId ACK >>= \case + sendSMPCommand c (Just rpKey) rId ACK Nothing >>= \case OK -> return () cmd@MSG {} -> lift . atomically $ writeTBQueue msgQ (smpServer, rId, cmd) @@ -331,14 +344,14 @@ deleteSMPQueue = okSMPCommand DEL okSMPCommand :: PartyI p => Command p -> SMPClient -> C.APrivateSignKey -> QueueId -> ExceptT SMPClientError IO () okSMPCommand cmd c pKey qId = - sendSMPCommand c (Just pKey) qId cmd >>= \case + sendSMPCommand c (Just pKey) qId cmd Nothing >>= \case OK -> return () _ -> throwE SMPUnexpectedResponse -- | Send SMP command -- TODO sign all requests (SEND of SMP confirmation would be signed with the same key that is passed to the recipient) -sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateSignKey -> QueueId -> Command p -> ExceptT SMPClientError IO BrokerMsg -sendSMPCommand SMPClient {sndQ, sentCommands, clientCorrId, sessionId, tcpTimeout} pKey qId cmd = do +sendSMPCommand :: PartyI p => SMPClient -> Maybe C.APrivateSignKey -> QueueId -> Command p -> Maybe Int -> ExceptT SMPClientError IO BrokerMsg +sendSMPCommand SMPClient {sndQ, sentCommands, clientCorrId, sessionId, tcpTimeout} pKey qId cmd cmdTimeout_ = do corrId <- lift_ getNextCorrId t <- signTransmission $ encodeTransmission sessionId (corrId, qId, cmd) ExceptT $ sendRecv corrId t @@ -362,7 +375,7 @@ sendSMPCommand SMPClient {sndQ, sentCommands, clientCorrId, sessionId, tcpTimeou sendRecv :: CorrId -> SentRawTransmission -> IO Response sendRecv corrId t = atomically (send corrId t) >>= withTimeout . atomically . takeTMVar where - withTimeout a = fromMaybe (Left SMPResponseTimeout) <$> timeout tcpTimeout a + withTimeout a = fromMaybe (Left SMPResponseTimeout) <$> timeout (fromMaybe tcpTimeout cmdTimeout_) a send :: CorrId -> SentRawTransmission -> STM (TMVar Response) send corrId t = do