mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-29 10:10:06 +00:00
rename "connection" to "queue"
This commit is contained in:
@@ -7,8 +7,8 @@ cfg :: Config
|
||||
cfg =
|
||||
Config
|
||||
{ tcpPort = "5223",
|
||||
queueSize = 16,
|
||||
connIdBytes = 12,
|
||||
tbqSize = 16,
|
||||
queueIdBytes = 12,
|
||||
msgIdBytes = 6
|
||||
}
|
||||
|
||||
|
||||
@@ -1,36 +0,0 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
|
||||
module ConnStore where
|
||||
|
||||
import Transmission
|
||||
|
||||
data Connection = Connection
|
||||
{ recipientId :: ConnId,
|
||||
senderId :: ConnId,
|
||||
recipientKey :: PublicKey,
|
||||
senderKey :: Maybe PublicKey,
|
||||
status :: ConnStatus
|
||||
}
|
||||
|
||||
data ConnStatus = ConnActive | ConnOff
|
||||
|
||||
class MonadConnStore s m where
|
||||
addConn :: s -> RecipientKey -> (RecipientId, SenderId) -> m (Either ErrorType ())
|
||||
getConn :: s -> SParty (a :: Party) -> ConnId -> m (Either ErrorType Connection)
|
||||
secureConn :: s -> RecipientId -> SenderKey -> m (Either ErrorType ())
|
||||
suspendConn :: s -> RecipientId -> m (Either ErrorType ())
|
||||
deleteConn :: s -> RecipientId -> m (Either ErrorType ())
|
||||
|
||||
-- TODO stub
|
||||
mkConnection :: RecipientKey -> (RecipientId, SenderId) -> Connection
|
||||
mkConnection recipientKey (recipientId, senderId) =
|
||||
Connection
|
||||
{ recipientId,
|
||||
senderId,
|
||||
recipientKey,
|
||||
senderKey = Nothing,
|
||||
status = ConnActive
|
||||
}
|
||||
@@ -1,92 +0,0 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE InstanceSigs #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
|
||||
module ConnStore.STM where
|
||||
|
||||
import ConnStore
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Transmission
|
||||
import UnliftIO.STM
|
||||
|
||||
data ConnStoreData = ConnStoreData
|
||||
{ connections :: Map RecipientId Connection,
|
||||
senders :: Map SenderId RecipientId
|
||||
}
|
||||
|
||||
type ConnStore = TVar ConnStoreData
|
||||
|
||||
newConnStore :: STM ConnStore
|
||||
newConnStore = newTVar ConnStoreData {connections = M.empty, senders = M.empty}
|
||||
|
||||
instance MonadConnStore ConnStore STM where
|
||||
addConn :: ConnStore -> RecipientKey -> (RecipientId, SenderId) -> STM (Either ErrorType ())
|
||||
addConn store rKey ids@(rId, sId) = do
|
||||
cs@ConnStoreData {connections, senders} <- readTVar store
|
||||
if M.member rId connections || M.member sId senders
|
||||
then return $ Left DUPLICATE
|
||||
else do
|
||||
writeTVar store $
|
||||
cs
|
||||
{ connections = M.insert rId (mkConnection rKey ids) connections,
|
||||
senders = M.insert sId rId senders
|
||||
}
|
||||
return $ Right ()
|
||||
|
||||
getConn :: ConnStore -> SParty (p :: Party) -> ConnId -> STM (Either ErrorType Connection)
|
||||
getConn store SRecipient rId = do
|
||||
cs <- readTVar store
|
||||
return $ getRcpConn cs rId
|
||||
getConn store SSender sId = do
|
||||
cs <- readTVar store
|
||||
let rId = M.lookup sId $ senders cs
|
||||
return $ maybe (Left AUTH) (getRcpConn cs) rId
|
||||
getConn _ SBroker _ =
|
||||
return $ Left INTERNAL
|
||||
|
||||
secureConn store rId sKey =
|
||||
updateConnections store rId $ \cs c ->
|
||||
case senderKey c of
|
||||
Just _ -> (Left AUTH, cs)
|
||||
_ -> (Right (), cs {connections = M.insert rId c {senderKey = Just sKey} (connections cs)})
|
||||
|
||||
suspendConn :: ConnStore -> RecipientId -> STM (Either ErrorType ())
|
||||
suspendConn store rId =
|
||||
updateConnections store rId $ \cs c ->
|
||||
(Right (), cs {connections = M.insert rId c {status = ConnOff} (connections cs)})
|
||||
|
||||
deleteConn :: ConnStore -> RecipientId -> STM (Either ErrorType ())
|
||||
deleteConn store rId =
|
||||
updateConnections store rId $ \cs c ->
|
||||
( Right (),
|
||||
cs
|
||||
{ connections = M.delete rId (connections cs),
|
||||
senders = M.delete (senderId c) (senders cs)
|
||||
}
|
||||
)
|
||||
|
||||
updateConnections ::
|
||||
ConnStore ->
|
||||
RecipientId ->
|
||||
(ConnStoreData -> Connection -> (Either ErrorType (), ConnStoreData)) ->
|
||||
STM (Either ErrorType ())
|
||||
updateConnections store rId update = do
|
||||
cs <- readTVar store
|
||||
let conn = getRcpConn cs rId
|
||||
either (return . Left) (_update cs) conn
|
||||
where
|
||||
_update cs c = do
|
||||
let (res, cs') = update cs c
|
||||
writeTVar store cs'
|
||||
return res
|
||||
|
||||
getRcpConn :: ConnStoreData -> RecipientId -> Either ErrorType Connection
|
||||
getRcpConn cs rId = maybe (Left AUTH) Right . M.lookup rId $ connections cs
|
||||
@@ -3,7 +3,6 @@
|
||||
|
||||
module Env.STM where
|
||||
|
||||
import ConnStore.STM
|
||||
import Control.Concurrent (ThreadId)
|
||||
import Control.Monad.IO.Unlift
|
||||
import Crypto.Random
|
||||
@@ -12,20 +11,21 @@ import qualified Data.Map.Strict as M
|
||||
import MsgStore.STM
|
||||
import Network.Socket (ServiceName)
|
||||
import Numeric.Natural
|
||||
import QueueStore.STM
|
||||
import Transmission
|
||||
import UnliftIO.STM
|
||||
|
||||
data Config = Config
|
||||
{ tcpPort :: ServiceName,
|
||||
queueSize :: Natural,
|
||||
connIdBytes :: Int,
|
||||
tbqSize :: Natural,
|
||||
queueIdBytes :: Int,
|
||||
msgIdBytes :: Int
|
||||
}
|
||||
|
||||
data Env = Env
|
||||
{ config :: Config,
|
||||
server :: Server,
|
||||
connStore :: ConnStore,
|
||||
queueStore :: QueueStore,
|
||||
msgStore :: STMMsgStore,
|
||||
idsDrg :: TVar ChaChaDRG
|
||||
}
|
||||
@@ -68,8 +68,8 @@ newSubscription = do
|
||||
|
||||
newEnv :: (MonadUnliftIO m, MonadRandom m) => Config -> m Env
|
||||
newEnv config = do
|
||||
server <- atomically $ newServer (queueSize config)
|
||||
connStore <- atomically newConnStore
|
||||
server <- atomically $ newServer (tbqSize config)
|
||||
queueStore <- atomically newQueueStore
|
||||
msgStore <- atomically newMsgStore
|
||||
idsDrg <- drgNew >>= newTVarIO
|
||||
return Env {config, server, connStore, msgStore, idsDrg}
|
||||
return Env {config, server, queueStore, msgStore, idsDrg}
|
||||
|
||||
35
src/QueueStore.hs
Normal file
35
src/QueueStore.hs
Normal file
@@ -0,0 +1,35 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
|
||||
module QueueStore where
|
||||
|
||||
import Transmission
|
||||
|
||||
data QueueRec = QueueRec
|
||||
{ recipientId :: QueueId,
|
||||
senderId :: QueueId,
|
||||
recipientKey :: PublicKey,
|
||||
senderKey :: Maybe PublicKey,
|
||||
status :: QueueStatus
|
||||
}
|
||||
|
||||
data QueueStatus = QueueActive | QueueOff
|
||||
|
||||
class MonadQueueStore s m where
|
||||
addQueue :: s -> RecipientKey -> (RecipientId, SenderId) -> m (Either ErrorType ())
|
||||
getQueue :: s -> SParty (a :: Party) -> QueueId -> m (Either ErrorType QueueRec)
|
||||
secureQueue :: s -> RecipientId -> SenderKey -> m (Either ErrorType ())
|
||||
suspendQueue :: s -> RecipientId -> m (Either ErrorType ())
|
||||
deleteQueue :: s -> RecipientId -> m (Either ErrorType ())
|
||||
|
||||
mkQueueRec :: RecipientKey -> (RecipientId, SenderId) -> QueueRec
|
||||
mkQueueRec recipientKey (recipientId, senderId) =
|
||||
QueueRec
|
||||
{ recipientId,
|
||||
senderId,
|
||||
recipientKey,
|
||||
senderKey = Nothing,
|
||||
status = QueueActive
|
||||
}
|
||||
92
src/QueueStore/STM.hs
Normal file
92
src/QueueStore/STM.hs
Normal file
@@ -0,0 +1,92 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE FlexibleInstances #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE InstanceSigs #-}
|
||||
{-# LANGUAGE KindSignatures #-}
|
||||
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE RankNTypes #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE UndecidableInstances #-}
|
||||
|
||||
module QueueStore.STM where
|
||||
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import QueueStore
|
||||
import Transmission
|
||||
import UnliftIO.STM
|
||||
|
||||
data QueueStoreData = QueueStoreData
|
||||
{ queues :: Map RecipientId QueueRec,
|
||||
senders :: Map SenderId RecipientId
|
||||
}
|
||||
|
||||
type QueueStore = TVar QueueStoreData
|
||||
|
||||
newQueueStore :: STM QueueStore
|
||||
newQueueStore = newTVar QueueStoreData {queues = M.empty, senders = M.empty}
|
||||
|
||||
instance MonadQueueStore QueueStore STM where
|
||||
addQueue :: QueueStore -> RecipientKey -> (RecipientId, SenderId) -> STM (Either ErrorType ())
|
||||
addQueue store rKey ids@(rId, sId) = do
|
||||
cs@QueueStoreData {queues, senders} <- readTVar store
|
||||
if M.member rId queues || M.member sId senders
|
||||
then return $ Left DUPLICATE
|
||||
else do
|
||||
writeTVar store $
|
||||
cs
|
||||
{ queues = M.insert rId (mkQueueRec rKey ids) queues,
|
||||
senders = M.insert sId rId senders
|
||||
}
|
||||
return $ Right ()
|
||||
|
||||
getQueue :: QueueStore -> SParty (p :: Party) -> QueueId -> STM (Either ErrorType QueueRec)
|
||||
getQueue store SRecipient rId = do
|
||||
cs <- readTVar store
|
||||
return $ getRcpQueue cs rId
|
||||
getQueue store SSender sId = do
|
||||
cs <- readTVar store
|
||||
let rId = M.lookup sId $ senders cs
|
||||
return $ maybe (Left AUTH) (getRcpQueue cs) rId
|
||||
getQueue _ SBroker _ =
|
||||
return $ Left INTERNAL
|
||||
|
||||
secureQueue store rId sKey =
|
||||
updateQueues store rId $ \cs c ->
|
||||
case senderKey c of
|
||||
Just _ -> (Left AUTH, cs)
|
||||
_ -> (Right (), cs {queues = M.insert rId c {senderKey = Just sKey} (queues cs)})
|
||||
|
||||
suspendQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ())
|
||||
suspendQueue store rId =
|
||||
updateQueues store rId $ \cs c ->
|
||||
(Right (), cs {queues = M.insert rId c {status = QueueOff} (queues cs)})
|
||||
|
||||
deleteQueue :: QueueStore -> RecipientId -> STM (Either ErrorType ())
|
||||
deleteQueue store rId =
|
||||
updateQueues store rId $ \cs c ->
|
||||
( Right (),
|
||||
cs
|
||||
{ queues = M.delete rId (queues cs),
|
||||
senders = M.delete (senderId c) (senders cs)
|
||||
}
|
||||
)
|
||||
|
||||
updateQueues ::
|
||||
QueueStore ->
|
||||
RecipientId ->
|
||||
(QueueStoreData -> QueueRec -> (Either ErrorType (), QueueStoreData)) ->
|
||||
STM (Either ErrorType ())
|
||||
updateQueues store rId update = do
|
||||
cs <- readTVar store
|
||||
let conn = getRcpQueue cs rId
|
||||
either (return . Left) (_update cs) conn
|
||||
where
|
||||
_update cs c = do
|
||||
let (res, cs') = update cs c
|
||||
writeTVar store cs'
|
||||
return res
|
||||
|
||||
getRcpQueue :: QueueStoreData -> RecipientId -> Either ErrorType QueueRec
|
||||
getRcpQueue cs rId = maybe (Left AUTH) Right . M.lookup rId $ queues cs
|
||||
119
src/Server.hs
119
src/Server.hs
@@ -12,8 +12,6 @@
|
||||
|
||||
module Server (runSMPServer) where
|
||||
|
||||
import ConnStore
|
||||
import ConnStore.STM (ConnStore)
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Monad
|
||||
import Control.Monad.IO.Unlift
|
||||
@@ -27,6 +25,8 @@ import Data.Time.Clock
|
||||
import Env.STM
|
||||
import MsgStore
|
||||
import MsgStore.STM (MsgQueue)
|
||||
import QueueStore
|
||||
import QueueStore.STM (QueueStore)
|
||||
import Transmission
|
||||
import Transport
|
||||
import UnliftIO.Async
|
||||
@@ -57,7 +57,7 @@ runSMPServer cfg@Config {tcpPort} = do
|
||||
runClient :: (MonadUnliftIO m, MonadReader Env m) => Handle -> m ()
|
||||
runClient h = do
|
||||
putLn h "Welcome to SMP"
|
||||
q <- asks $ queueSize . config
|
||||
q <- asks $ tbqSize . config
|
||||
c <- atomically $ newClient q
|
||||
s <- asks server
|
||||
raceAny_ [send h c, client c s, receive h c]
|
||||
@@ -80,11 +80,10 @@ raceAny_ = r []
|
||||
|
||||
receive :: (MonadUnliftIO m, MonadReader Env m) => Handle -> Client -> m ()
|
||||
receive h Client {rcvQ} = forever $ do
|
||||
(signature, (connId, cmdOrError)) <- tGet fromClient h
|
||||
-- TODO maybe send Either to queue?
|
||||
(signature, (queueId, cmdOrError)) <- tGet fromClient h
|
||||
signed <- case cmdOrError of
|
||||
Left e -> return . mkResp connId $ ERR e
|
||||
Right cmd -> verifyTransmission signature connId cmd
|
||||
Left e -> return . mkResp queueId $ ERR e
|
||||
Right cmd -> verifyTransmission signature queueId cmd
|
||||
atomically $ writeTBQueue rcvQ signed
|
||||
|
||||
send :: MonadUnliftIO m => Handle -> Client -> m ()
|
||||
@@ -92,22 +91,22 @@ send h Client {sndQ} = forever $ do
|
||||
signed <- atomically $ readTBQueue sndQ
|
||||
tPut h (B.empty, signed)
|
||||
|
||||
mkResp :: ConnId -> Command 'Broker -> Signed
|
||||
mkResp connId command = (connId, Cmd SBroker command)
|
||||
mkResp :: QueueId -> Command 'Broker -> Signed
|
||||
mkResp queueId command = (queueId, Cmd SBroker command)
|
||||
|
||||
verifyTransmission :: forall m. (MonadUnliftIO m, MonadReader Env m) => Signature -> ConnId -> Cmd -> m Signed
|
||||
verifyTransmission signature connId cmd = do
|
||||
(connId,) <$> case cmd of
|
||||
verifyTransmission :: forall m. (MonadUnliftIO m, MonadReader Env m) => Signature -> QueueId -> Cmd -> m Signed
|
||||
verifyTransmission signature queueId cmd = do
|
||||
(queueId,) <$> case cmd of
|
||||
Cmd SBroker _ -> return $ smpErr INTERNAL -- it can only be client command, because `fromClient` was used
|
||||
Cmd SRecipient (CONN _) -> return cmd
|
||||
Cmd SRecipient _ -> withConnection SRecipient $ verifySignature . recipientKey
|
||||
Cmd SSender (SEND _) -> withConnection SSender $ verifySend . senderKey
|
||||
Cmd SRecipient (NEW _) -> return cmd
|
||||
Cmd SRecipient _ -> withQueueRec SRecipient $ verifySignature . recipientKey
|
||||
Cmd SSender (SEND _) -> withQueueRec SSender $ verifySend . senderKey
|
||||
where
|
||||
withConnection :: SParty (p :: Party) -> (Connection -> m Cmd) -> m Cmd
|
||||
withConnection party f = do
|
||||
store <- asks connStore
|
||||
conn <- atomically $ getConn store party connId
|
||||
either (return . smpErr) f conn
|
||||
withQueueRec :: SParty (p :: Party) -> (QueueRec -> m Cmd) -> m Cmd
|
||||
withQueueRec party f = do
|
||||
st <- asks queueStore
|
||||
qr <- atomically $ getQueue st party queueId
|
||||
either (return . smpErr) f qr
|
||||
verifySend :: Maybe PublicKey -> m Cmd
|
||||
verifySend
|
||||
| B.null signature = return . maybe cmd (const authErr)
|
||||
@@ -127,44 +126,44 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
>>= atomically . writeTBQueue sndQ
|
||||
where
|
||||
processCommand :: Signed -> m Signed
|
||||
processCommand (connId, cmd) = do
|
||||
st <- asks connStore
|
||||
processCommand (queueId, cmd) = do
|
||||
st <- asks queueStore
|
||||
case cmd of
|
||||
Cmd SBroker END -> unsubscribeConn $> (connId, cmd)
|
||||
Cmd SBroker _ -> return (connId, cmd)
|
||||
Cmd SBroker END -> unsubscribeQueue $> (queueId, cmd)
|
||||
Cmd SBroker _ -> return (queueId, cmd)
|
||||
Cmd SSender (SEND msgBody) -> sendMessage st msgBody
|
||||
Cmd SRecipient command -> case command of
|
||||
CONN rKey -> createConn st rKey
|
||||
SUB -> subscribeConn connId
|
||||
NEW rKey -> createQueue st rKey
|
||||
SUB -> subscribeQueue queueId
|
||||
ACK -> acknowledgeMsg
|
||||
KEY sKey -> okResp <$> atomically (secureConn st connId sKey)
|
||||
OFF -> okResp <$> atomically (suspendConn st connId)
|
||||
DEL -> delConnAndMsgs st
|
||||
KEY sKey -> okResp <$> atomically (secureQueue st queueId sKey)
|
||||
OFF -> okResp <$> atomically (suspendQueue st queueId)
|
||||
DEL -> delQueueAndMsgs st
|
||||
where
|
||||
createConn :: ConnStore -> RecipientKey -> m Signed
|
||||
createConn st rKey = mkResp B.empty <$> addSubscribe
|
||||
createQueue :: QueueStore -> RecipientKey -> m Signed
|
||||
createQueue st rKey = mkResp B.empty <$> addSubscribe
|
||||
where
|
||||
addSubscribe =
|
||||
addConnRetry 3 >>= \case
|
||||
addQueueRetry 3 >>= \case
|
||||
Left e -> return $ ERR e
|
||||
Right (rId, sId) -> subscribeConn rId $> IDS rId sId
|
||||
Right (rId, sId) -> subscribeQueue rId $> IDS rId sId
|
||||
|
||||
addConnRetry :: Int -> m (Either ErrorType (RecipientId, SenderId))
|
||||
addConnRetry 0 = return $ Left INTERNAL
|
||||
addConnRetry n = do
|
||||
addQueueRetry :: Int -> m (Either ErrorType (RecipientId, SenderId))
|
||||
addQueueRetry 0 = return $ Left INTERNAL
|
||||
addQueueRetry n = do
|
||||
ids <- getIds
|
||||
atomically (addConn st rKey ids) >>= \case
|
||||
Left DUPLICATE -> addConnRetry $ n - 1
|
||||
atomically (addQueue st rKey ids) >>= \case
|
||||
Left DUPLICATE -> addQueueRetry $ n - 1
|
||||
Left e -> return $ Left e
|
||||
Right _ -> return $ Right ids
|
||||
|
||||
getIds :: m (RecipientId, SenderId)
|
||||
getIds = do
|
||||
n <- asks $ connIdBytes . config
|
||||
n <- asks $ queueIdBytes . config
|
||||
liftM2 (,) (randomId n) (randomId n)
|
||||
|
||||
subscribeConn :: RecipientId -> m Signed
|
||||
subscribeConn rId =
|
||||
subscribeQueue :: RecipientId -> m Signed
|
||||
subscribeQueue rId =
|
||||
atomically (getSubscription rId) >>= deliverMessage tryPeekMsg rId
|
||||
|
||||
getSubscription :: RecipientId -> STM Sub
|
||||
@@ -178,26 +177,26 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
writeTVar subscriptions $ M.insert rId s subs
|
||||
return s
|
||||
|
||||
unsubscribeConn :: m ()
|
||||
unsubscribeConn = do
|
||||
unsubscribeQueue :: m ()
|
||||
unsubscribeQueue = do
|
||||
sub <- atomically . stateTVar subscriptions $
|
||||
\cs -> (M.lookup connId cs, M.delete connId cs)
|
||||
\cs -> (M.lookup queueId cs, M.delete queueId cs)
|
||||
mapM_ cancelSub sub
|
||||
|
||||
acknowledgeMsg :: m Signed
|
||||
acknowledgeMsg =
|
||||
atomically (withSub connId $ \s -> const s <$$> tryTakeTMVar (delivered s))
|
||||
atomically (withSub queueId $ \s -> const s <$$> tryTakeTMVar (delivered s))
|
||||
>>= \case
|
||||
Just (Just s) -> deliverMessage tryDelPeekMsg connId s
|
||||
Just (Just s) -> deliverMessage tryDelPeekMsg queueId s
|
||||
_ -> return $ err PROHIBITED
|
||||
|
||||
withSub :: RecipientId -> (Sub -> STM a) -> STM (Maybe a)
|
||||
withSub rId f = readTVar subscriptions >>= mapM f . M.lookup rId
|
||||
|
||||
sendMessage :: ConnStore -> MsgBody -> m Signed
|
||||
sendMessage :: QueueStore -> MsgBody -> m Signed
|
||||
sendMessage st msgBody = do
|
||||
conn <- atomically $ getConn st SSender connId
|
||||
either (return . err) storeMessage conn
|
||||
qr <- atomically $ getQueue st SSender queueId
|
||||
either (return . err) storeMessage qr
|
||||
where
|
||||
mkMessage :: m Message
|
||||
mkMessage = do
|
||||
@@ -205,14 +204,14 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
ts <- liftIO getCurrentTime
|
||||
return $ Message {msgId, ts, msgBody}
|
||||
|
||||
storeMessage :: Connection -> m Signed
|
||||
storeMessage c = case status c of
|
||||
ConnOff -> return $ err AUTH
|
||||
ConnActive -> do
|
||||
storeMessage :: QueueRec -> m Signed
|
||||
storeMessage qr = case status qr of
|
||||
QueueOff -> return $ err AUTH
|
||||
QueueActive -> do
|
||||
ms <- asks msgStore
|
||||
msg <- mkMessage
|
||||
atomically $ do
|
||||
q <- getMsgQueue ms (recipientId c)
|
||||
q <- getMsgQueue ms (recipientId qr)
|
||||
writeMsg q msg
|
||||
return ok
|
||||
|
||||
@@ -247,19 +246,19 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
|
||||
setDelivered :: STM (Maybe Bool)
|
||||
setDelivered = withSub rId $ \s -> tryPutTMVar (delivered s) ()
|
||||
|
||||
delConnAndMsgs :: ConnStore -> m Signed
|
||||
delConnAndMsgs st = do
|
||||
delQueueAndMsgs :: QueueStore -> m Signed
|
||||
delQueueAndMsgs st = do
|
||||
ms <- asks msgStore
|
||||
atomically $
|
||||
deleteConn st connId >>= \case
|
||||
deleteQueue st queueId >>= \case
|
||||
Left e -> return $ err e
|
||||
Right _ -> delMsgQueue ms connId $> ok
|
||||
Right _ -> delMsgQueue ms queueId $> ok
|
||||
|
||||
ok :: Signed
|
||||
ok = mkResp connId OK
|
||||
ok = mkResp queueId OK
|
||||
|
||||
err :: ErrorType -> Signed
|
||||
err = mkResp connId . ERR
|
||||
err = mkResp queueId . ERR
|
||||
|
||||
okResp :: Either ErrorType () -> Signed
|
||||
okResp = either err $ const ok
|
||||
|
||||
@@ -33,18 +33,18 @@ data Cmd where
|
||||
|
||||
deriving instance Show Cmd
|
||||
|
||||
type Signed = (ConnId, Cmd)
|
||||
type Signed = (QueueId, Cmd)
|
||||
|
||||
type Transmission = (Signature, Signed)
|
||||
|
||||
type SignedOrError = (ConnId, Either ErrorType Cmd)
|
||||
type SignedOrError = (QueueId, Either ErrorType Cmd)
|
||||
|
||||
type TransmissionOrError = (Signature, SignedOrError)
|
||||
|
||||
type RawTransmission = (ByteString, ByteString, ByteString)
|
||||
|
||||
data Command (a :: Party) where
|
||||
CONN :: RecipientKey -> Command Recipient
|
||||
NEW :: RecipientKey -> Command Recipient
|
||||
SUB :: Command Recipient
|
||||
KEY :: SenderKey -> Command Recipient
|
||||
ACK :: Command Recipient
|
||||
@@ -63,8 +63,8 @@ deriving instance Eq (Command a)
|
||||
|
||||
parseCommand :: ByteString -> Either ErrorType Cmd
|
||||
parseCommand command = case B.words command of
|
||||
["CONN", rKeyStr] -> case decode rKeyStr of
|
||||
Right rKey -> rCmd $ CONN rKey
|
||||
["NEW", rKeyStr] -> case decode rKeyStr of
|
||||
Right rKey -> rCmd $ NEW rKey
|
||||
_ -> errParams
|
||||
["SUB"] -> rCmd SUB
|
||||
["KEY", sKeyStr] -> case decode sKeyStr of
|
||||
@@ -95,7 +95,7 @@ parseCommand command = case B.words command of
|
||||
["AUTH"] -> bErr AUTH
|
||||
["INTERNAL"] -> bErr INTERNAL
|
||||
_ -> errParams
|
||||
"CONN" : _ -> errParams
|
||||
"NEW" : _ -> errParams
|
||||
"SUB" : _ -> errParams
|
||||
"KEY" : _ -> errParams
|
||||
"ACK" : _ -> errParams
|
||||
@@ -123,7 +123,7 @@ zero = ord '0'
|
||||
|
||||
serializeCommand :: Cmd -> ByteString
|
||||
serializeCommand = \case
|
||||
Cmd SRecipient (CONN rKey) -> "CONN " <> encode rKey
|
||||
Cmd SRecipient (NEW rKey) -> "NEW " <> encode rKey
|
||||
Cmd SRecipient (KEY sKey) -> "KEY " <> encode sKey
|
||||
Cmd SRecipient cmd -> B.pack $ show cmd
|
||||
Cmd SSender (SEND msgBody) -> "SEND" <> serializeMsg msgBody
|
||||
@@ -145,11 +145,11 @@ type RecipientKey = PublicKey
|
||||
|
||||
type SenderKey = PublicKey
|
||||
|
||||
type RecipientId = ConnId
|
||||
type RecipientId = QueueId
|
||||
|
||||
type SenderId = ConnId
|
||||
type SenderId = QueueId
|
||||
|
||||
type ConnId = Encoded
|
||||
type QueueId = Encoded
|
||||
|
||||
type MsgId = Encoded
|
||||
|
||||
|
||||
@@ -80,20 +80,20 @@ getBytes :: MonadIO m => Handle -> Int -> m ByteString
|
||||
getBytes h = liftIO . B.hGet h
|
||||
|
||||
tPutRaw :: MonadIO m => Handle -> RawTransmission -> m ()
|
||||
tPutRaw h (signature, connId, command) = do
|
||||
tPutRaw h (signature, queueId, command) = do
|
||||
putLn h (encode signature)
|
||||
putLn h (encode connId)
|
||||
putLn h (encode queueId)
|
||||
putLn h command
|
||||
|
||||
tGetRaw :: MonadIO m => Handle -> m (Either String RawTransmission)
|
||||
tGetRaw h = do
|
||||
signature <- decode <$> getLn h
|
||||
connId <- decode <$> getLn h
|
||||
queueId <- decode <$> getLn h
|
||||
command <- getLn h
|
||||
return $ liftM2 (,,command) signature connId
|
||||
return $ liftM2 (,,command) signature queueId
|
||||
|
||||
tPut :: MonadIO m => Handle -> Transmission -> m ()
|
||||
tPut h (signature, (connId, command)) = tPutRaw h (signature, connId, serializeCommand command)
|
||||
tPut h (signature, (queueId, command)) = tPutRaw h (signature, queueId, serializeCommand command)
|
||||
|
||||
fromClient :: Cmd -> Either ErrorType Cmd
|
||||
fromClient = \case
|
||||
@@ -114,32 +114,32 @@ tGet fromParty h = tGetRaw h >>= either (const tError) tParseLoadBody
|
||||
tError = return (B.empty, (B.empty, Left $ SYNTAX errBadTransmission))
|
||||
|
||||
tParseLoadBody :: RawTransmission -> m TransmissionOrError
|
||||
tParseLoadBody t@(signature, connId, command) = do
|
||||
tParseLoadBody t@(signature, queueId, command) = do
|
||||
let cmd = parseCommand command >>= fromParty >>= tCredentials t
|
||||
fullCmd <- either (return . Left) cmdWithMsgBody cmd
|
||||
return (signature, (connId, fullCmd))
|
||||
return (signature, (queueId, fullCmd))
|
||||
|
||||
tCredentials :: RawTransmission -> Cmd -> Either ErrorType Cmd
|
||||
tCredentials (signature, connId, _) cmd = case cmd of
|
||||
-- IDS response should not have connection ID
|
||||
tCredentials (signature, queueId, _) cmd = case cmd of
|
||||
-- IDS response should not have queue ID
|
||||
Cmd SBroker (IDS _ _) -> Right cmd
|
||||
-- ERROR response does not always have connection ID
|
||||
-- ERROR response does not always have queue ID
|
||||
Cmd SBroker (ERR _) -> Right cmd
|
||||
-- other responses must have connection ID
|
||||
-- other responses must have queue ID
|
||||
Cmd SBroker _
|
||||
| B.null connId -> Left $ SYNTAX errNoConnectionId
|
||||
| B.null queueId -> Left $ SYNTAX errNoConnectionId
|
||||
| otherwise -> Right cmd
|
||||
-- CREATE must NOT have signature or connection ID
|
||||
Cmd SRecipient (CONN _)
|
||||
| B.null signature && B.null connId -> Right cmd
|
||||
-- CREATE must NOT have signature or queue ID
|
||||
Cmd SRecipient (NEW _)
|
||||
| B.null signature && B.null queueId -> Right cmd
|
||||
| otherwise -> Left $ SYNTAX errHasCredentials
|
||||
-- SEND must have connection ID, signature is not always required
|
||||
-- SEND must have queue ID, signature is not always required
|
||||
Cmd SSender (SEND _)
|
||||
| B.null connId -> Left $ SYNTAX errNoConnectionId
|
||||
| B.null queueId -> Left $ SYNTAX errNoConnectionId
|
||||
| otherwise -> Right cmd
|
||||
-- other client commands must have both signature and connection ID
|
||||
-- other client commands must have both signature and queue ID
|
||||
Cmd SRecipient _
|
||||
| B.null signature || B.null connId -> Left $ SYNTAX errNoCredentials
|
||||
| B.null signature || B.null queueId -> Left $ SYNTAX errNoCredentials
|
||||
| otherwise -> Right cmd
|
||||
|
||||
cmdWithMsgBody :: Cmd -> m (Either ErrorType Cmd)
|
||||
|
||||
@@ -35,8 +35,8 @@ cfg :: Config
|
||||
cfg =
|
||||
Config
|
||||
{ tcpPort = testPort,
|
||||
queueSize = 1,
|
||||
connIdBytes = 12,
|
||||
tbqSize = 1,
|
||||
queueIdBytes = 12,
|
||||
msgIdBytes = 6
|
||||
}
|
||||
|
||||
|
||||
@@ -20,18 +20,18 @@ import Transport
|
||||
main :: IO ()
|
||||
main = hspec do
|
||||
describe "SMP syntax" syntaxTests
|
||||
describe "SMP connections" do
|
||||
describe "CONN and KEY commands, SEND messages" testCreateSecure
|
||||
describe "CONN, OFF and DEL commands, SEND messages" testCreateDelete
|
||||
describe "SMP queues" do
|
||||
describe "NEW and KEY commands, SEND messages" testCreateSecure
|
||||
describe "NEW, OFF and DEL commands, SEND messages" testCreateDelete
|
||||
describe "SMP messages" do
|
||||
describe "duplex communication over 2 SMP connections" testDuplex
|
||||
describe "switch subscription to another SMP connection" testSwitchSub
|
||||
describe "switch subscription to another SMP queue" testSwitchSub
|
||||
|
||||
pattern Resp :: ConnId -> Command 'Broker -> TransmissionOrError
|
||||
pattern Resp connId command = ("", (connId, Right (Cmd SBroker command)))
|
||||
pattern Resp :: QueueId -> Command 'Broker -> TransmissionOrError
|
||||
pattern Resp queueId command = ("", (queueId, Right (Cmd SBroker command)))
|
||||
|
||||
sendRecv :: Handle -> RawTransmission -> IO TransmissionOrError
|
||||
sendRecv h (sgn, cId, cmd) = tPutRaw h (fromRight "" $ decode sgn, cId, cmd) >> tGet fromServer h
|
||||
sendRecv h (sgn, qId, cmd) = tPutRaw h (fromRight "" $ decode sgn, qId, cmd) >> tGet fromServer h
|
||||
|
||||
(>#>) :: [RawTransmission] -> [RawTransmission] -> Expectation
|
||||
commands >#> responses = smpServerTest commands `shouldReturn` responses
|
||||
@@ -41,14 +41,14 @@ commands >#> responses = smpServerTest commands `shouldReturn` responses
|
||||
|
||||
testCreateSecure :: SpecWith ()
|
||||
testCreateSecure =
|
||||
it "should create (CONN) and secure (KEY) connection" $
|
||||
it "should create (NEW) and secure (KEY) queue" $
|
||||
smpTest \h -> do
|
||||
Resp rId1 (IDS rId sId) <- sendRecv h ("", "", "CONN 1234")
|
||||
(rId1, "") #== "creates connection"
|
||||
Resp rId1 (IDS rId sId) <- sendRecv h ("", "", "NEW 1234")
|
||||
(rId1, "") #== "creates queue"
|
||||
|
||||
Resp sId1 ok1 <- sendRecv h ("", sId, "SEND :hello")
|
||||
(ok1, OK) #== "accepts unsigned SEND"
|
||||
(sId1, sId) #== "same connection ID in response 1"
|
||||
(sId1, sId) #== "same queue ID in response 1"
|
||||
|
||||
Resp _ (MSG _ _ msg1) <- tGet fromServer h
|
||||
(msg1, "hello") #== "delivers message"
|
||||
@@ -61,7 +61,7 @@ testCreateSecure =
|
||||
|
||||
Resp sId2 err1 <- sendRecv h ("4567", sId, "SEND :hello")
|
||||
(err1, ERR AUTH) #== "rejects signed SEND"
|
||||
(sId2, sId) #== "same connection ID in response 2"
|
||||
(sId2, sId) #== "same queue ID in response 2"
|
||||
|
||||
Resp _ err2 <- sendRecv h ("12345678", rId, "KEY 4567")
|
||||
(err2, ERR AUTH) #== "rejects KEY with wrong signature (password atm)"
|
||||
@@ -70,8 +70,8 @@ testCreateSecure =
|
||||
(err3, ERR AUTH) #== "rejects KEY with sender's ID"
|
||||
|
||||
Resp rId2 ok2 <- sendRecv h ("1234", rId, "KEY 4567")
|
||||
(ok2, OK) #== "secures connection"
|
||||
(rId2, rId) #== "same connection ID in response 3"
|
||||
(ok2, OK) #== "secures queue"
|
||||
(rId2, rId) #== "same queue ID in response 3"
|
||||
|
||||
Resp _ err4 <- sendRecv h ("1234", rId, "KEY 4567")
|
||||
(err4, ERR AUTH) #== "rejects KEY if already secured"
|
||||
@@ -90,13 +90,13 @@ testCreateSecure =
|
||||
|
||||
testCreateDelete :: SpecWith ()
|
||||
testCreateDelete =
|
||||
it "should create (CONN), suspend (OFF) and delete (DEL) connection" $
|
||||
it "should create (NEW), suspend (OFF) and delete (DEL) queue" $
|
||||
smpTest2 \rh sh -> do
|
||||
Resp rId1 (IDS rId sId) <- sendRecv rh ("", "", "CONN 1234")
|
||||
(rId1, "") #== "creates connection"
|
||||
Resp rId1 (IDS rId sId) <- sendRecv rh ("", "", "NEW 1234")
|
||||
(rId1, "") #== "creates queue"
|
||||
|
||||
Resp _ ok1 <- sendRecv rh ("1234", rId, "KEY 4567")
|
||||
(ok1, OK) #== "secures connection"
|
||||
(ok1, OK) #== "secures queue"
|
||||
|
||||
Resp _ ok2 <- sendRecv sh ("4567", sId, "SEND :hello")
|
||||
(ok2, OK) #== "accepts signed SEND"
|
||||
@@ -114,8 +114,8 @@ testCreateDelete =
|
||||
(err2, ERR AUTH) #== "rejects OFF with sender's ID"
|
||||
|
||||
Resp rId2 ok3 <- sendRecv rh ("1234", rId, "OFF")
|
||||
(ok3, OK) #== "suspends connection"
|
||||
(rId2, rId) #== "same connection ID in response 2"
|
||||
(ok3, OK) #== "suspends queue"
|
||||
(rId2, rId) #== "same queue ID in response 2"
|
||||
|
||||
Resp _ err3 <- sendRecv sh ("4567", sId, "SEND :hello")
|
||||
(err3, ERR AUTH) #== "rejects signed SEND"
|
||||
@@ -136,8 +136,8 @@ testCreateDelete =
|
||||
(err6, ERR AUTH) #== "rejects DEL with sender's ID"
|
||||
|
||||
Resp rId3 ok6 <- sendRecv rh ("1234", rId, "DEL")
|
||||
(ok6, OK) #== "deletes connection"
|
||||
(rId3, rId) #== "same connection ID in response 3"
|
||||
(ok6, OK) #== "deletes queue"
|
||||
(rId3, rId) #== "same queue ID in response 3"
|
||||
|
||||
Resp _ err7 <- sendRecv sh ("4567", sId, "SEND :hello")
|
||||
(err7, ERR AUTH) #== "rejects signed SEND when deleted"
|
||||
@@ -158,7 +158,7 @@ testDuplex :: SpecWith ()
|
||||
testDuplex =
|
||||
it "should create 2 simplex connections and exchange messages" $
|
||||
smpTest2 \alice bob -> do
|
||||
Resp _ (IDS aRcv aSnd) <- sendRecv alice ("", "", "CONN 1234")
|
||||
Resp _ (IDS aRcv aSnd) <- sendRecv alice ("", "", "NEW 1234")
|
||||
-- aSnd ID is passed to Bob out-of-band
|
||||
|
||||
Resp _ OK <- sendRecv bob ("", aSnd, "SEND :key efgh")
|
||||
@@ -170,14 +170,14 @@ testDuplex =
|
||||
(key1, "efgh") #== "key received from Bob"
|
||||
Resp _ OK <- sendRecv alice ("1234", aRcv, "KEY " <> key1)
|
||||
|
||||
Resp _ (IDS bRcv bSnd) <- sendRecv bob ("", "", "CONN abcd")
|
||||
Resp _ (IDS bRcv bSnd) <- sendRecv bob ("", "", "NEW abcd")
|
||||
Resp _ OK <- sendRecv bob ("efgh", aSnd, "SEND :reply_id " <> encode bSnd)
|
||||
-- "reply_id ..." is ad-hoc, it is not a part of SMP protocol
|
||||
|
||||
Resp _ (MSG _ _ msg2) <- tGet fromServer alice
|
||||
Resp _ OK <- sendRecv alice ("1234", aRcv, "ACK")
|
||||
["reply_id", bId] <- return $ B.words msg2
|
||||
(bId, encode bSnd) #== "reply connection ID received from Bob"
|
||||
(bId, encode bSnd) #== "reply queue ID received from Bob"
|
||||
Resp _ OK <- sendRecv alice ("", bSnd, "SEND :key 5678")
|
||||
-- "key 5678" is ad-hoc, different from SMP protocol
|
||||
|
||||
@@ -203,7 +203,7 @@ testSwitchSub :: SpecWith ()
|
||||
testSwitchSub =
|
||||
it "should create simplex connections and switch subscription to another TCP connection" $
|
||||
smpTest3 \rh1 rh2 sh -> do
|
||||
Resp _ (IDS rId sId) <- sendRecv rh1 ("", "", "CONN 1234")
|
||||
Resp _ (IDS rId sId) <- sendRecv rh1 ("", "", "NEW 1234")
|
||||
Resp _ ok1 <- sendRecv sh ("", sId, "SEND :test1")
|
||||
(ok1, OK) #== "sent test message 1"
|
||||
Resp _ ok2 <- sendRecv sh ("", sId, "SEND :test2, no ACK")
|
||||
@@ -215,11 +215,11 @@ testSwitchSub =
|
||||
(msg2, "test2, no ACK") #== "test message 2 delivered, no ACK"
|
||||
|
||||
Resp _ (MSG _ _ msg2') <- sendRecv rh2 ("1234", rId, "SUB")
|
||||
(msg2', "test2, no ACK") #== "same simplex connection via another TCP connection, tes2 delivered again (no ACK in 1st connection)"
|
||||
(msg2', "test2, no ACK") #== "same simplex queue via another TCP connection, tes2 delivered again (no ACK in 1st queue)"
|
||||
Resp _ OK <- sendRecv rh2 ("1234", rId, "ACK")
|
||||
|
||||
Resp _ end <- tGet fromServer rh1
|
||||
(end, END) #== "unsubscribed the 1st connection"
|
||||
(end, END) #== "unsubscribed the 1st TCP connection"
|
||||
|
||||
Resp _ OK <- sendRecv sh ("", sId, "SEND :test3")
|
||||
|
||||
@@ -234,22 +234,22 @@ testSwitchSub =
|
||||
|
||||
timeout 1000 (tGet fromServer rh1) >>= \case
|
||||
Nothing -> return ()
|
||||
Just _ -> error "nothing else is delivered to the 1st TCPconnection"
|
||||
Just _ -> error "nothing else is delivered to the 1st TCP connection"
|
||||
|
||||
syntaxTests :: SpecWith ()
|
||||
syntaxTests = do
|
||||
it "unknown command" $ [("", "1234", "HELLO")] >#> [("", "1234", "ERR UNKNOWN")]
|
||||
describe "CONN" do
|
||||
it "no parameters" $ [("", "", "CONN")] >#> [("", "", "ERR SYNTAX 2")]
|
||||
it "many parameters" $ [("", "", "CONN 1 2")] >#> [("", "", "ERR SYNTAX 2")]
|
||||
it "has signature" $ [("1234", "", "CONN 1234")] >#> [("", "", "ERR SYNTAX 4")]
|
||||
it "connection ID" $ [("", "1", "CONN 1234")] >#> [("", "1", "ERR SYNTAX 4")]
|
||||
describe "NEW" do
|
||||
it "no parameters" $ [("", "", "NEW")] >#> [("", "", "ERR SYNTAX 2")]
|
||||
it "many parameters" $ [("", "", "NEW 1 2")] >#> [("", "", "ERR SYNTAX 2")]
|
||||
it "has signature" $ [("1234", "", "NEW 1234")] >#> [("", "", "ERR SYNTAX 4")]
|
||||
it "queue ID" $ [("", "1", "NEW 1234")] >#> [("", "1", "ERR SYNTAX 4")]
|
||||
describe "KEY" do
|
||||
it "valid syntax" $ [("1234", "1", "KEY 4567")] >#> [("", "1", "ERR AUTH")]
|
||||
it "no parameters" $ [("1234", "1", "KEY")] >#> [("", "1", "ERR SYNTAX 2")]
|
||||
it "many parameters" $ [("1234", "1", "KEY 1 2")] >#> [("", "1", "ERR SYNTAX 2")]
|
||||
it "no signature" $ [("", "1", "KEY 4567")] >#> [("", "1", "ERR SYNTAX 3")]
|
||||
it "no connection ID" $ [("1234", "", "KEY 4567")] >#> [("", "", "ERR SYNTAX 3")]
|
||||
it "no queue ID" $ [("1234", "", "KEY 4567")] >#> [("", "", "ERR SYNTAX 3")]
|
||||
noParamsSyntaxTest "SUB"
|
||||
noParamsSyntaxTest "ACK"
|
||||
noParamsSyntaxTest "OFF"
|
||||
@@ -258,7 +258,7 @@ syntaxTests = do
|
||||
it "valid syntax 1" $ [("1234", "1", "SEND :hello")] >#> [("", "1", "ERR AUTH")]
|
||||
it "valid syntax 2" $ [("1234", "1", "SEND 11\nhello there\n")] >#> [("", "1", "ERR AUTH")]
|
||||
it "no parameters" $ [("1234", "1", "SEND")] >#> [("", "1", "ERR SYNTAX 2")]
|
||||
it "no connection ID" $ [("1234", "", "SEND :hello")] >#> [("", "", "ERR SYNTAX 5")]
|
||||
it "no queue ID" $ [("1234", "", "SEND :hello")] >#> [("", "", "ERR SYNTAX 5")]
|
||||
it "bad message body 1" $ [("1234", "1", "SEND 11 hello")] >#> [("", "1", "ERR SYNTAX 6")]
|
||||
it "bad message body 2" $ [("1234", "1", "SEND hello")] >#> [("", "1", "ERR SYNTAX 6")]
|
||||
it "bigger body" $ [("1234", "1", "SEND 4\nhello\n")] >#> [("", "1", "ERR SIZE")]
|
||||
@@ -270,4 +270,4 @@ syntaxTests = do
|
||||
it "valid syntax" $ [("1234", "1", cmd)] >#> [("", "1", "ERR AUTH")]
|
||||
it "parameters" $ [("1234", "1", cmd <> " 1")] >#> [("", "1", "ERR SYNTAX 2")]
|
||||
it "no signature" $ [("", "1", cmd)] >#> [("", "1", "ERR SYNTAX 3")]
|
||||
it "no connection ID" $ [("1234", "", cmd)] >#> [("", "", "ERR SYNTAX 3")]
|
||||
it "no queue ID" $ [("1234", "", cmd)] >#> [("", "", "ERR SYNTAX 3")]
|
||||
|
||||
Reference in New Issue
Block a user