mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-14 22:45:06 +00:00
SMP heartbeat to maintain the connection (#59)
* SMP heartbeat to maintain the connection * separate SMP commands into sections * update SMP command sections * update SMP commands comment Co-authored-by: Efim Poberezkin <efim.poberezkin@gmail.com>
This commit is contained in:
committed by
GitHub
parent
3af34dea8b
commit
7570ef9e22
@@ -20,6 +20,7 @@ dependencies:
|
||||
- containers
|
||||
- cryptonite == 0.26.*
|
||||
- iso8601-time == 0.1.*
|
||||
- math-functions == 0.3.*
|
||||
- memory == 0.15.*
|
||||
- mtl
|
||||
- network == 3.1.*
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE TupleSections #-}
|
||||
|
||||
module Simplex.Messaging.Client
|
||||
( SMPClient,
|
||||
@@ -27,6 +26,7 @@ module Simplex.Messaging.Client
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.STM
|
||||
import Control.Exception
|
||||
@@ -68,7 +68,8 @@ type SMPServerTransmission = (SMPServer, RecipientId, Command 'Broker)
|
||||
data SMPClientConfig = SMPClientConfig
|
||||
{ qSize :: Natural,
|
||||
defaultPort :: ServiceName,
|
||||
tcpTimeout :: Int
|
||||
tcpTimeout :: Int,
|
||||
smpPing :: Int
|
||||
}
|
||||
|
||||
smpDefaultConfig :: SMPClientConfig
|
||||
@@ -76,7 +77,8 @@ smpDefaultConfig =
|
||||
SMPClientConfig
|
||||
{ qSize = 16,
|
||||
defaultPort = "5223",
|
||||
tcpTimeout = 2_000_000
|
||||
tcpTimeout = 2_000_000,
|
||||
smpPing = 30_000_000
|
||||
}
|
||||
|
||||
data Request = Request
|
||||
@@ -87,7 +89,7 @@ data Request = Request
|
||||
getSMPClient :: SMPServer -> SMPClientConfig -> TBQueue SMPServerTransmission -> IO () -> IO SMPClient
|
||||
getSMPClient
|
||||
smpServer@SMPServer {host, port}
|
||||
SMPClientConfig {qSize, defaultPort, tcpTimeout}
|
||||
SMPClientConfig {qSize, defaultPort, tcpTimeout, smpPing}
|
||||
msgQ
|
||||
disconnected = do
|
||||
c <- atomically mkSMPClient
|
||||
@@ -128,7 +130,7 @@ getSMPClient
|
||||
atomically $ do
|
||||
modifyTVar (connected c) (const True)
|
||||
putTMVar started True
|
||||
raceAny_ [send c h, process c, receive c h]
|
||||
raceAny_ [send c h, process c, receive c h, ping c]
|
||||
`finally` disconnected
|
||||
|
||||
send :: SMPClient -> Handle -> IO ()
|
||||
@@ -137,6 +139,11 @@ getSMPClient
|
||||
receive :: SMPClient -> Handle -> IO ()
|
||||
receive SMPClient {rcvQ} h = forever $ tGet fromServer h >>= atomically . writeTBQueue rcvQ
|
||||
|
||||
ping :: SMPClient -> IO ()
|
||||
ping c = forever $ do
|
||||
threadDelay smpPing
|
||||
runExceptT $ sendSMPCommand c Nothing "" (Cmd SSender PING)
|
||||
|
||||
process :: SMPClient -> IO ()
|
||||
process SMPClient {rcvQ, sentCommands} = forever $ do
|
||||
(_, (corrId, qId, respOrErr)) <- atomically $ readTBQueue rcvQ
|
||||
|
||||
@@ -54,6 +54,8 @@ import Database.SQLite.Simple.Ok (Ok (Ok))
|
||||
import Database.SQLite.Simple.ToField (ToField (..))
|
||||
import Simplex.Messaging.Parsers (base64P)
|
||||
import Simplex.Messaging.Util (bshow, liftEitherError, (<$$>))
|
||||
import Data.Bits (shift, complement, (.&.))
|
||||
import Numeric.SpecFunctions (log2)
|
||||
|
||||
newtype PublicKey = PublicKey {rsaPublicKey :: R.PublicKey} deriving (Eq, Show)
|
||||
|
||||
@@ -243,3 +245,13 @@ rsaPrivateKey pk =
|
||||
R.private_dQ = undefined,
|
||||
R.private_qinv = undefined
|
||||
}
|
||||
|
||||
-- | computes padded message length using Padmé padding scheme
|
||||
-- https://bford.info/pub/sec/purb.pdf
|
||||
-- currently not used
|
||||
paddedLength :: Int -> Int
|
||||
paddedLength len = (len + mask) .&. complement mask
|
||||
where
|
||||
mask = (1 `shift` zeroBytes len) - 1
|
||||
zeroBytes 1 = 0
|
||||
zeroBytes l = let e = log2 l in e - log2 e - 1
|
||||
|
||||
@@ -64,18 +64,23 @@ type SenderId = QueueId
|
||||
type QueueId = Encoded
|
||||
|
||||
data Command (a :: Party) where
|
||||
-- SMP recipient commands
|
||||
NEW :: RecipientPublicKey -> Command Recipient
|
||||
SUB :: Command Recipient
|
||||
KEY :: SenderPublicKey -> Command Recipient
|
||||
ACK :: Command Recipient
|
||||
OFF :: Command Recipient
|
||||
DEL :: Command Recipient
|
||||
-- SMP sender commands
|
||||
SEND :: MsgBody -> Command Sender
|
||||
PING :: Command Sender
|
||||
-- SMP broker commands (responses, messages, notifications)
|
||||
IDS :: RecipientId -> SenderId -> Command Broker
|
||||
MSG :: MsgId -> UTCTime -> MsgBody -> Command Broker
|
||||
END :: Command Broker
|
||||
OK :: Command Broker
|
||||
ERR :: ErrorType -> Command Broker
|
||||
PONG :: Command Broker
|
||||
|
||||
deriving instance Show (Command a)
|
||||
|
||||
@@ -91,10 +96,12 @@ commandP =
|
||||
<|> "OFF" $> Cmd SRecipient OFF
|
||||
<|> "DEL" $> Cmd SRecipient DEL
|
||||
<|> "SEND " *> sendCmd
|
||||
<|> "PING" $> Cmd SSender PING
|
||||
<|> "MSG " *> message
|
||||
<|> "END" $> Cmd SBroker END
|
||||
<|> "OK" $> Cmd SBroker OK
|
||||
<|> "ERR " *> serverError
|
||||
<|> "PONG" $> Cmd SBroker PONG
|
||||
where
|
||||
newCmd = Cmd SRecipient . NEW <$> C.pubKeyP
|
||||
idsResp = Cmd SBroker <$> (IDS <$> (base64P <* A.space) <*> base64P)
|
||||
@@ -121,6 +128,7 @@ serializeCommand = \case
|
||||
Cmd SRecipient (KEY sKey) -> "KEY " <> C.serializePubKey sKey
|
||||
Cmd SRecipient cmd -> B.pack $ show cmd
|
||||
Cmd SSender (SEND msgBody) -> "SEND" <> serializeMsg msgBody
|
||||
Cmd SSender PING -> "PING"
|
||||
Cmd SBroker (MSG msgId ts msgBody) ->
|
||||
B.unwords ["MSG", encode msgId, B.pack $ formatISO8601Millis ts] <> serializeMsg msgBody
|
||||
Cmd SBroker (IDS rId sId) -> B.unwords ["IDS", encode rId, encode sId]
|
||||
@@ -186,6 +194,10 @@ tGet fromParty h = do
|
||||
Cmd SBroker (IDS _ _) -> Right cmd
|
||||
-- ERR response does not always have queue ID
|
||||
Cmd SBroker (ERR _) -> Right cmd
|
||||
-- PONG response should not have queue ID
|
||||
Cmd SBroker PONG
|
||||
| B.null queueId -> Right cmd
|
||||
| otherwise -> Left $ SYNTAX errHasCredentials
|
||||
-- other responses must have queue ID
|
||||
Cmd SBroker _
|
||||
| B.null queueId -> Left $ SYNTAX errNoQueueId
|
||||
@@ -199,6 +211,10 @@ tGet fromParty h = do
|
||||
Cmd SSender (SEND _)
|
||||
| B.null queueId -> Left $ SYNTAX errNoQueueId
|
||||
| otherwise -> Right cmd
|
||||
-- PING must not have queue ID or signature
|
||||
Cmd SSender PING
|
||||
| B.null queueId && B.null signature -> Right cmd
|
||||
| otherwise -> Left $ SYNTAX errHasCredentials
|
||||
-- other client commands must have both signature and queue ID
|
||||
Cmd SRecipient _
|
||||
| B.null signature || B.null queueId -> Left $ SYNTAX errNoCredentials
|
||||
|
||||
@@ -99,6 +99,7 @@ verifyTransmission (sig, t@(corrId, queueId, cmd)) = do
|
||||
Cmd SRecipient (NEW k) -> return $ verifySignature k
|
||||
Cmd SRecipient _ -> withQueueRec SRecipient $ verifySignature . recipientKey
|
||||
Cmd SSender (SEND _) -> withQueueRec SSender $ verifySend sig . senderKey
|
||||
Cmd SSender PING -> return cmd
|
||||
where
|
||||
withQueueRec :: SParty (p :: Party) -> (QueueRec -> Cmd) -> m Cmd
|
||||
withQueueRec party f = do
|
||||
@@ -130,7 +131,9 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
case cmd of
|
||||
Cmd SBroker END -> unsubscribeQueue $> (corrId, queueId, cmd)
|
||||
Cmd SBroker _ -> return (corrId, queueId, cmd)
|
||||
Cmd SSender (SEND msgBody) -> sendMessage st msgBody
|
||||
Cmd SSender command -> case command of
|
||||
SEND msgBody -> sendMessage st msgBody
|
||||
PING -> return (corrId, queueId, Cmd SBroker PONG)
|
||||
Cmd SRecipient command -> case command of
|
||||
NEW rKey -> createQueue st rKey
|
||||
SUB -> subscribeQueue queueId
|
||||
|
||||
@@ -117,7 +117,8 @@ cfg =
|
||||
SMPClientConfig
|
||||
{ qSize = 1,
|
||||
defaultPort = testPort,
|
||||
tcpTimeout = 500_000
|
||||
tcpTimeout = 500_000,
|
||||
smpPing = 2_000_000
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -288,6 +288,8 @@ syntaxTests = do
|
||||
it "bad message body 1" $ ("1234", "cdab", "12345678", "SEND 11 hello") >#> ("", "cdab", "12345678", "ERR SYNTAX 2")
|
||||
it "bad message body 2" $ ("1234", "dabc", "12345678", "SEND hello") >#> ("", "dabc", "12345678", "ERR SYNTAX 2")
|
||||
it "bigger body" $ ("1234", "abcd", "12345678", "SEND 4\r\nhello") >#> ("", "abcd", "12345678", "ERR SIZE")
|
||||
describe "PING" do
|
||||
it "valid syntax" $ ("", "abcd", "", "PING") >#> ("", "abcd", "", "PONG")
|
||||
describe "broker response not allowed" do
|
||||
it "OK" $ ("1234", "bcda", "12345678", "OK") >#> ("", "bcda", "12345678", "ERR PROHIBITED")
|
||||
where
|
||||
|
||||
Reference in New Issue
Block a user