mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 16:26:02 +00:00
type CorrelationId -> newtype CorrId (to avoid incorrect order in tuple)
This commit is contained in:
@@ -51,7 +51,7 @@ runSMPServer cfg@ServerConfig {tcpPort} = do
|
||||
(rId, clnt) <- readTBQueue subscribedQ
|
||||
cs <- readTVar subscribers
|
||||
case M.lookup rId cs of
|
||||
Just Client {rcvQ} -> writeTBQueue rcvQ (B.empty, rId, Cmd SBroker END)
|
||||
Just Client {rcvQ} -> writeTBQueue rcvQ (CorrId B.empty, rId, Cmd SBroker END)
|
||||
Nothing -> return ()
|
||||
writeTVar subscribers $ M.insert rId clnt cs
|
||||
|
||||
@@ -86,7 +86,7 @@ send h Client {sndQ} = forever $ do
|
||||
signed <- atomically $ readTBQueue sndQ
|
||||
tPut h (B.empty, signed)
|
||||
|
||||
mkResp :: CorrelationId -> QueueId -> Command 'Broker -> Signed
|
||||
mkResp :: CorrId -> QueueId -> Command 'Broker -> Signed
|
||||
mkResp corrId queueId command = (corrId, queueId, Cmd SBroker command)
|
||||
|
||||
verifyTransmission :: forall m. (MonadUnliftIO m, MonadReader Env m) => Transmission -> m Signed
|
||||
@@ -231,7 +231,7 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
subscriber :: MsgQueue -> m ()
|
||||
subscriber q = atomically $ do
|
||||
msg <- peekMsg q
|
||||
writeTBQueue sndQ $ mkResp B.empty rId (msgCmd msg)
|
||||
writeTBQueue sndQ $ mkResp (CorrId B.empty) rId (msgCmd msg)
|
||||
setSub (\s -> s {subThread = NoSub})
|
||||
void setDelivered
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Char (ord)
|
||||
import Data.Kind
|
||||
import Data.String
|
||||
import Data.Time.Clock
|
||||
import Data.Time.ISO8601
|
||||
import Simplex.Messaging.Transport
|
||||
@@ -39,11 +40,11 @@ data Cmd where
|
||||
|
||||
deriving instance Show Cmd
|
||||
|
||||
type Signed = (CorrelationId, QueueId, Cmd)
|
||||
type Signed = (CorrId, QueueId, Cmd)
|
||||
|
||||
type Transmission = (Signature, Signed)
|
||||
|
||||
type SignedOrError = (CorrelationId, QueueId, Either ErrorType Cmd)
|
||||
type SignedOrError = (CorrId, QueueId, Either ErrorType Cmd)
|
||||
|
||||
type TransmissionOrError = (Signature, SignedOrError)
|
||||
|
||||
@@ -143,7 +144,11 @@ serializeCommand = \case
|
||||
|
||||
type Encoded = ByteString
|
||||
|
||||
type CorrelationId = ByteString
|
||||
-- newtype to avoid accidentally changing order of transmission parts
|
||||
newtype CorrId = CorrId {bs :: ByteString} deriving (Eq)
|
||||
|
||||
instance IsString CorrId where
|
||||
fromString = CorrId . fromString
|
||||
|
||||
type PublicKey = Encoded
|
||||
|
||||
@@ -199,7 +204,7 @@ tGetRaw h = do
|
||||
return (signature, corrId, queueId, command)
|
||||
|
||||
tPut :: MonadIO m => Handle -> Transmission -> m ()
|
||||
tPut h (signature, (corrId, queueId, command)) = tPutRaw h (encode signature, corrId, encode queueId, serializeCommand command)
|
||||
tPut h (signature, (corrId, queueId, command)) = tPutRaw h (encode signature, bs corrId, encode queueId, serializeCommand command)
|
||||
|
||||
fromClient :: Cmd -> Either ErrorType Cmd
|
||||
fromClient = \case
|
||||
@@ -220,13 +225,13 @@ tGet fromParty h = do
|
||||
either (const $ tError corrId) tParseLoadBody decodedTransmission
|
||||
where
|
||||
tError :: ByteString -> m TransmissionOrError
|
||||
tError corrId = return (B.empty, (corrId, B.empty, Left $ SYNTAX errBadTransmission))
|
||||
tError corrId = return (B.empty, (CorrId corrId, B.empty, Left $ SYNTAX errBadTransmission))
|
||||
|
||||
tParseLoadBody :: RawTransmission -> m TransmissionOrError
|
||||
tParseLoadBody t@(signature, corrId, queueId, command) = do
|
||||
let cmd = parseCommand command >>= fromParty >>= tCredentials t
|
||||
fullCmd <- either (return . Left) cmdWithMsgBody cmd
|
||||
return (signature, (corrId, queueId, fullCmd))
|
||||
return (signature, (CorrId corrId, queueId, fullCmd))
|
||||
|
||||
tCredentials :: RawTransmission -> Cmd -> Either ErrorType Cmd
|
||||
tCredentials (signature, _, queueId, _) cmd = case cmd of
|
||||
|
||||
@@ -25,7 +25,7 @@ main = hspec do
|
||||
describe "duplex communication over 2 SMP connections" testDuplex
|
||||
describe "switch subscription to another SMP queue" testSwitchSub
|
||||
|
||||
pattern Resp :: CorrelationId -> QueueId -> Command 'Broker -> TransmissionOrError
|
||||
pattern Resp :: CorrId -> QueueId -> Command 'Broker -> TransmissionOrError
|
||||
pattern Resp corrId queueId command <- ("", (corrId, queueId, Right (Cmd SBroker command)))
|
||||
|
||||
sendRecv :: Handle -> (ByteString, ByteString, ByteString, ByteString) -> IO TransmissionOrError
|
||||
|
||||
Reference in New Issue
Block a user