mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 20:45:52 +00:00
sign server responses
This commit is contained in:
@@ -24,7 +24,7 @@ import Simplex.Messaging.Server (runSMPServer)
|
||||
import Simplex.Messaging.Server.Env.STM
|
||||
import Simplex.Messaging.Server.StoreLog (StoreLog, openReadStoreLog, storeLogFilePath)
|
||||
import Simplex.Messaging.Transport (ATransport (..), TLS, Transport (..))
|
||||
import Simplex.Messaging.Transport.WebSockets (WS)
|
||||
-- import Simplex.Messaging.Transport.WebSockets (WS)
|
||||
import System.Directory (createDirectoryIfMissing, doesFileExist, removeFile)
|
||||
import System.Exit (exitFailure)
|
||||
import System.FilePath (combine)
|
||||
|
||||
@@ -206,7 +206,7 @@ getSMPClient smpServer cfg@SMPClientConfig {qSize, tcpTimeout, smpPing, smpBlock
|
||||
|
||||
process :: SMPClient -> IO ()
|
||||
process SMPClient {rcvQ, sentCommands} = forever $ do
|
||||
(_, _, (_, corrId, qId, respOrErr)) <- atomically $ readTBQueue rcvQ
|
||||
(_, _, (corrId, qId, respOrErr)) <- atomically $ readTBQueue rcvQ
|
||||
if B.null $ bs corrId
|
||||
then sendMsg qId respOrErr
|
||||
else do
|
||||
@@ -349,7 +349,7 @@ okSMPCommand cmd c pKey qId =
|
||||
sendSMPCommand :: SMPClient -> Maybe C.APrivateSignKey -> QueueId -> ClientCmd -> ExceptT SMPClientError IO (Command 'Broker)
|
||||
sendSMPCommand SMPClient {sndQ, sentCommands, clientCorrId, sndSessionId, tcpTimeout} pKey qId cmd = do
|
||||
corrId <- lift_ getNextCorrId
|
||||
t <- signTransmission $ serializeTransmission (sndSessionId, corrId, qId, cmd)
|
||||
t <- signTransmission $ serializeTransmission sndSessionId (corrId, qId, cmd)
|
||||
ExceptT $ sendRecv corrId t
|
||||
where
|
||||
lift_ :: STM a -> ExceptT SMPClientError IO a
|
||||
|
||||
@@ -31,6 +31,7 @@ module Simplex.Messaging.Protocol
|
||||
Command (..),
|
||||
CommandI (..),
|
||||
Party (..),
|
||||
ClientParty (..),
|
||||
Cmd (..),
|
||||
ClientCmd (..),
|
||||
SParty (..),
|
||||
@@ -129,25 +130,29 @@ instance PartyI Sender where sParty = SSender
|
||||
|
||||
instance PartyI Notifier where sParty = SNotifier
|
||||
|
||||
data ClientParty = forall p. IsClient p => CP (SParty p)
|
||||
|
||||
-- | Type for command or response of any participant.
|
||||
data Cmd = forall p. PartyI p => Cmd (SParty p) (Command p)
|
||||
|
||||
deriving instance Show Cmd
|
||||
|
||||
-- | Type for command or response of any participant.
|
||||
data ClientCmd = forall p. (PartyI p, ClientParty p) => ClientCmd (SParty p) (Command p)
|
||||
data ClientCmd = forall p. (PartyI p, IsClient p) => ClientCmd (SParty p) (Command p)
|
||||
|
||||
class CommandI c where
|
||||
serializeCommand :: c -> ByteString
|
||||
commandP :: Parser c
|
||||
|
||||
-- | SMP transmission without signature.
|
||||
type Transmission c = (SessionId, CorrId, QueueId, c)
|
||||
-- | Parsed SMP transmission without signature, size and session ID.
|
||||
type Transmission c = (CorrId, QueueId, c)
|
||||
|
||||
type BrokerTransmission = Transmission (Command Broker)
|
||||
|
||||
-- | signed parsed transmission, with original raw bytes and parsing error.
|
||||
type SignedTransmission c = (Maybe C.ASignature, ByteString, Transmission (Either ErrorType c))
|
||||
type SignedTransmission c = (Maybe C.ASignature, Signed, Transmission (Either ErrorType c))
|
||||
|
||||
type Signed = ByteString
|
||||
|
||||
-- | unparsed SMP transmission with signature.
|
||||
data RawTransmission = RawTransmission
|
||||
@@ -159,8 +164,8 @@ data RawTransmission = RawTransmission
|
||||
command :: ByteString
|
||||
}
|
||||
|
||||
-- | unparsed sent SMP transmission with signature.
|
||||
type SignedRawTransmission = (Maybe C.ASignature, ByteString, ByteString, ByteString, ByteString)
|
||||
-- | unparsed sent SMP transmission with signature, without session ID.
|
||||
type SignedRawTransmission = (Maybe C.ASignature, ByteString, ByteString, ByteString)
|
||||
|
||||
-- | unparsed sent SMP transmission with signature.
|
||||
type SentRawTransmission = (Maybe C.ASignature, ByteString)
|
||||
@@ -206,15 +211,15 @@ deriving instance Show (Command a)
|
||||
|
||||
deriving instance Eq (Command a)
|
||||
|
||||
type family ClientParty p :: Constraint where
|
||||
ClientParty Recipient = ()
|
||||
ClientParty Sender = ()
|
||||
ClientParty Notifier = ()
|
||||
ClientParty p =
|
||||
type family IsClient p :: Constraint where
|
||||
IsClient Recipient = ()
|
||||
IsClient Sender = ()
|
||||
IsClient Notifier = ()
|
||||
IsClient p =
|
||||
(Int ~ Bool, TypeError (Text "Party " :<>: ShowType p :<>: Text " is not a Client"))
|
||||
|
||||
clientParty :: SParty p -> Maybe (Dict (ClientParty p))
|
||||
clientParty = \case
|
||||
isClient :: SParty p -> Maybe (Dict (IsClient p))
|
||||
isClient = \case
|
||||
SRecipient -> Just Dict
|
||||
SSender -> Just Dict
|
||||
SNotifier -> Just Dict
|
||||
@@ -378,7 +383,7 @@ instance CommandI ClientCmd where
|
||||
commandP = clientCmd <$?> commandP
|
||||
where
|
||||
clientCmd :: Cmd -> Either String ClientCmd
|
||||
clientCmd (Cmd p cmd) = case clientParty p of
|
||||
clientCmd (Cmd p cmd) = case isClient p of
|
||||
Just Dict -> Right (ClientCmd p cmd)
|
||||
_ -> Left "not a client command"
|
||||
|
||||
@@ -430,13 +435,13 @@ serializeErrorType = bshow
|
||||
tPut :: Transport c => THandle c -> SentRawTransmission -> IO (Either TransportError ())
|
||||
tPut th (sig, t) = tPutEncrypted th $ C.serializeSignature sig <> " " <> serializeBody t
|
||||
|
||||
serializeTransmission :: CommandI c => Transmission c -> ByteString
|
||||
serializeTransmission (SessionId sessId, CorrId corrId, queueId, command) =
|
||||
serializeTransmission :: CommandI c => SessionId -> Transmission c -> ByteString
|
||||
serializeTransmission (SessionId sessId) (CorrId corrId, queueId, command) =
|
||||
B.unwords [sessId, corrId, encode queueId, serializeCommand command]
|
||||
|
||||
-- | Validate that it is an SMP client command, used with 'tGet' by 'Simplex.Messaging.Server'.
|
||||
fromClient :: Cmd -> Either ErrorType ClientCmd
|
||||
fromClient (Cmd p cmd) = case clientParty p of
|
||||
fromClient (Cmd p cmd) = case isClient p of
|
||||
Just Dict -> Right $ ClientCmd p cmd
|
||||
Nothing -> Left $ CMD PROHIBITED
|
||||
|
||||
@@ -455,27 +460,27 @@ tGetParse th = (first (const TEBadBlock) . A.parseOnly transmissionP =<<) <$> tG
|
||||
-- The first argument is used to limit allowed senders.
|
||||
-- 'fromClient' or 'fromServer' should be used here.
|
||||
tGet :: forall c m cmd. (Transport c, MonadIO m) => (Cmd -> Either ErrorType cmd) -> THandle c -> m (SignedTransmission cmd)
|
||||
tGet fromParty th@THandle {rcvSessionId, sndSessionId} = liftIO (tGetParse th) >>= decodeParseValidate
|
||||
tGet fromParty th@THandle {rcvSessionId} = liftIO (tGetParse th) >>= decodeParseValidate
|
||||
where
|
||||
decodeParseValidate :: Either TransportError RawTransmission -> m (SignedTransmission cmd)
|
||||
decodeParseValidate = \case
|
||||
Right RawTransmission {signature, signed, sessId, corrId, queueId, command}
|
||||
| SessionId sessId == rcvSessionId ->
|
||||
let decodedTransmission = liftM2 (,sessId,corrId,,command) (C.decodeSignature =<< decode signature) (decode queueId)
|
||||
let decodedTransmission = liftM2 (,corrId,,command) (C.decodeSignature =<< decode signature) (decode queueId)
|
||||
in either (const $ tError corrId) (tParseValidate signed) decodedTransmission
|
||||
| otherwise -> pure (Nothing, "", (sndSessionId, CorrId corrId, "", Left SESSION))
|
||||
| otherwise -> pure (Nothing, "", (CorrId corrId, "", Left SESSION))
|
||||
Left _ -> tError ""
|
||||
|
||||
tError :: ByteString -> m (SignedTransmission cmd)
|
||||
tError corrId = pure (Nothing, "", (sndSessionId, CorrId corrId, "", Left BLOCK))
|
||||
tError corrId = pure (Nothing, "", (CorrId corrId, "", Left BLOCK))
|
||||
|
||||
tParseValidate :: ByteString -> SignedRawTransmission -> m (SignedTransmission cmd)
|
||||
tParseValidate signed t@(sig, sessId, corrId, queueId, command) = do
|
||||
tParseValidate signed t@(sig, corrId, queueId, command) = do
|
||||
let cmd = parseCommand command >>= tCredentials t >>= fromParty
|
||||
return (sig, signed, (SessionId sessId, CorrId corrId, queueId, cmd))
|
||||
return (sig, signed, (CorrId corrId, queueId, cmd))
|
||||
|
||||
tCredentials :: SignedRawTransmission -> Cmd -> Either ErrorType Cmd
|
||||
tCredentials (sig, _, _, queueId, _) cmd = case cmd of
|
||||
tCredentials (sig, _, queueId, _) cmd = case cmd of
|
||||
-- IDS response must not have queue ID
|
||||
Cmd SBroker (IDS _) -> Right cmd
|
||||
-- ERR response does not always have queue ID
|
||||
|
||||
@@ -106,7 +106,7 @@ runSMPServerBlocking started cfg@ServerConfig {transports} = do
|
||||
join <$> mapM (endPreviousSubscriptions qId) (M.lookup qId serverSubs)
|
||||
endPreviousSubscriptions :: QueueId -> Client -> STM (Maybe s)
|
||||
endPreviousSubscriptions qId c = do
|
||||
writeTBQueue (sndQ c) (SessionId "", CorrId "", qId, END)
|
||||
writeTBQueue (sndQ c) (Just $ CP SRecipient, (CorrId "", qId, END))
|
||||
stateTVar (clientSubs c) $ \ss -> (M.lookup qId ss, M.delete qId ss)
|
||||
|
||||
runClient :: (Transport c, MonadUnliftIO m, MonadReader Env m) => TProxy c -> c -> m ()
|
||||
@@ -135,35 +135,56 @@ cancelSub = \case
|
||||
_ -> return ()
|
||||
|
||||
receive :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> Client -> m ()
|
||||
receive h@THandle {sndSessionId} Client {rcvQ, sndQ} = forever $ do
|
||||
(sig, signed, (sessId, corrId, queueId, cmdOrError)) <- tGet fromClient h
|
||||
receive th Client {rcvQ, sndQ} = forever $ do
|
||||
(sig, signed, (corrId, queueId, cmdOrError)) <- tGet fromClient th
|
||||
case cmdOrError of
|
||||
Left e -> write sndQ (sndSessionId, corrId, queueId, ERR e)
|
||||
Left e -> write sndQ (Nothing, (corrId, queueId, ERR e))
|
||||
Right cmd -> do
|
||||
verified <- verifyTransmission sig signed queueId cmd
|
||||
if verified
|
||||
then write rcvQ (sessId, corrId, queueId, cmd)
|
||||
else write sndQ (sndSessionId, corrId, queueId, ERR AUTH)
|
||||
then write rcvQ (corrId, queueId, cmd)
|
||||
else write sndQ (Nothing, (corrId, queueId, ERR AUTH))
|
||||
where
|
||||
write q t = atomically $ writeTBQueue q t
|
||||
|
||||
send :: (Transport c, MonadUnliftIO m) => THandle c -> Client -> m ()
|
||||
send h Client {sndQ} = forever $ do
|
||||
t <- atomically $ readTBQueue sndQ
|
||||
-- TODO sign it here?
|
||||
liftIO $ tPut h (Nothing, serializeTransmission t)
|
||||
send :: (Transport c, MonadUnliftIO m, MonadReader Env m) => THandle c -> Client -> m ()
|
||||
send h Client {sndQ, sndSessionId} = forever $ do
|
||||
atomically (readTBQueue sndQ)
|
||||
>>= signTransmission sndSessionId
|
||||
>>= liftIO . tPut h
|
||||
|
||||
signTransmission ::
|
||||
forall m. (MonadUnliftIO m, MonadReader Env m) => SessionId -> (Maybe ClientParty, BrokerTransmission) -> m SentRawTransmission
|
||||
signTransmission sndSessionId (party_, t@(_, queueId, cmd)) =
|
||||
case party_ of
|
||||
Nothing -> unsigned
|
||||
Just (CP SNotifier) -> unsigned
|
||||
Just party ->
|
||||
case cmd of
|
||||
ERR QUOTA -> signed party
|
||||
ERR _ -> unsigned
|
||||
PONG -> unsigned
|
||||
_ -> signed party
|
||||
where
|
||||
s = serializeTransmission sndSessionId t
|
||||
unsigned = pure (Nothing, s)
|
||||
signed :: ClientParty -> m SentRawTransmission
|
||||
signed party = do
|
||||
st <- asks queueStore
|
||||
q <- atomically $ getQueue st party queueId
|
||||
pure (Nothing, s)
|
||||
|
||||
verifyTransmission ::
|
||||
forall m. (MonadUnliftIO m, MonadReader Env m) => Maybe C.ASignature -> ByteString -> QueueId -> ClientCmd -> m Bool
|
||||
verifyTransmission sig_ signed queueId cmd = do
|
||||
case cmd of
|
||||
ClientCmd SRecipient (NEW k _) -> pure $ verifySignature k
|
||||
ClientCmd SRecipient _ -> verifyCmd SRecipient $ verifySignature . recipientKey
|
||||
ClientCmd SSender (SEND _) -> verifyCmd SSender $ verifyMaybe . senderKey
|
||||
ClientCmd SRecipient _ -> verifyCmd (CP SRecipient) $ verifySignature . recipientKey
|
||||
ClientCmd SSender (SEND _) -> verifyCmd (CP SSender) $ verifyMaybe . senderKey
|
||||
ClientCmd SSender PING -> pure True
|
||||
ClientCmd SNotifier NSUB -> verifyCmd SNotifier $ verifyMaybe . fmap snd . notifier
|
||||
ClientCmd SNotifier NSUB -> verifyCmd (CP SNotifier) $ verifyMaybe . fmap snd . notifier
|
||||
where
|
||||
verifyCmd :: SParty p -> (QueueRec -> Bool) -> m Bool
|
||||
verifyCmd :: ClientParty -> (QueueRec -> Bool) -> m Bool
|
||||
verifyCmd party f = do
|
||||
st <- asks queueStore
|
||||
q <- atomically $ getQueue st party queueId
|
||||
@@ -212,28 +233,29 @@ dummyKey512 :: C.PublicKey 'C.RSA
|
||||
dummyKey512 = "MIICoDANBgkqhkiG9w0BAQEFAAOCAo0AMIICiAKCAgEArkCY9DuverJ4mmzDektv9aZMFyeRV46WZK9NsOBKEc+1ncqMs+LhLti9asKNgUBRbNzmbOe0NYYftrUpwnATaenggkTFxxbJ4JGJuGYbsEdFWkXSvrbWGtM8YUmn5RkAGme12xQ89bSM4VoJAGnrYPHwmcQd+KYCPZvTUsxaxgrJTX65ejHN9BsAn8XtGViOtHTDJO9yUMD2WrJvd7wnNa+0ugEteDLzMU++xS98VC+uA1vfauUqi3yXVchdfrLdVUuM+JE0gUEXCgzjuHkaoHiaGNiGhdPYoAJJdOKQOIHAKdk7Th6OPhirPhc9XYNB4O8JDthKhNtfokvFIFlC4QBRzJhpLIENaEBDt08WmgpOnecZB/CuxkqqOrNa8j5K5jNrtXAI67W46VEC2jeQy/gZwb64Zit2A4D00xXzGbQTPGj4ehcEMhLx5LSCygViEf0w0tN3c3TEyUcgPzvECd2ZVpQLr9Z4a07Ebr+YSuxcHhjg4Rg1VyJyOTTvaCBGm5X2B3+tI4NUttmikIHOYpBnsLmHY2BgfH2KcrIsDyAhInXmTFr/L2+erFarUnlfATd2L8Ti43TNHDedO6k6jI5Gyi62yPwjqPLEIIK8l+pIeNfHJ3pPmjhHBfzFcQLMMMXffHWNK8kWklrQXK+4j4HiPcTBvlO1FEtG9nEIZhUCgYA4a6WtI2k5YNli1C89GY5rGUY7RP71T6RWri/D3Lz9T7GvU+FemAyYmsvCQwqijUOur0uLvwSP8VdxpSUcrjJJSWur2hrPWzWlu0XbNaeizxpFeKbQP+zSrWJ1z8RwfAeUjShxt8q1TuqGqY10wQyp3nyiTGvS+KwZVj5h5qx8NQ=="
|
||||
|
||||
client :: forall m. (MonadUnliftIO m, MonadReader Env m) => Client -> Server -> m ()
|
||||
client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sndSessionId} Server {subscribedQ, ntfSubscribedQ, notifiers} =
|
||||
client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ} Server {subscribedQ, ntfSubscribedQ, notifiers} =
|
||||
forever $
|
||||
atomically (readTBQueue rcvQ)
|
||||
>>= processCommand
|
||||
>>= atomically . writeTBQueue sndQ
|
||||
where
|
||||
processCommand :: Transmission ClientCmd -> m BrokerTransmission
|
||||
processCommand (_, corrId, queueId, cmd) = do
|
||||
processCommand :: Transmission ClientCmd -> m (Maybe ClientParty, BrokerTransmission)
|
||||
processCommand (corrId, queueId, cmd) = do
|
||||
st <- asks queueStore
|
||||
case cmd of
|
||||
ClientCmd SSender command -> case command of
|
||||
SEND msgBody -> sendMessage st msgBody
|
||||
PING -> pure (sndSessionId, corrId, queueId, PONG)
|
||||
ClientCmd SNotifier NSUB -> subscribeNotifications
|
||||
ClientCmd SRecipient command -> case command of
|
||||
NEW rKey dhKey -> createQueue st rKey dhKey
|
||||
SUB -> subscribeQueue queueId
|
||||
ACK -> acknowledgeMsg
|
||||
KEY sKey -> secureQueue_ st sKey
|
||||
NKEY nKey -> addQueueNotifier_ st nKey
|
||||
OFF -> suspendQueue_ st
|
||||
DEL -> delQueueAndMsgs st
|
||||
SEND msgBody -> (Just $ CP SSender,) <$> sendMessage st msgBody
|
||||
PING -> pure (Nothing, (corrId, queueId, PONG))
|
||||
ClientCmd SNotifier NSUB -> (Just $ CP SNotifier,) <$> subscribeNotifications
|
||||
ClientCmd SRecipient command ->
|
||||
(Just $ CP SRecipient,) <$> case command of
|
||||
NEW rKey dhKey -> createQueue st rKey dhKey
|
||||
SUB -> subscribeQueue queueId
|
||||
ACK -> acknowledgeMsg
|
||||
KEY sKey -> secureQueue_ st sKey
|
||||
NKEY nKey -> addQueueNotifier_ st nKey
|
||||
OFF -> suspendQueue_ st
|
||||
DEL -> delQueueAndMsgs st
|
||||
where
|
||||
createQueue :: QueueStore -> RcvPublicVerifyKey -> RcvPublicDhKey -> m BrokerTransmission
|
||||
createQueue st recipientKey dhKey = checkKeySize recipientKey $ do
|
||||
@@ -272,7 +294,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sndSessionId} S
|
||||
|
||||
logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
|
||||
logCreateById s rId =
|
||||
atomically (getQueue st SRecipient rId) >>= \case
|
||||
atomically (getQueue st (CP SRecipient) rId) >>= \case
|
||||
Right q -> logCreateQueue s q
|
||||
_ -> pure ()
|
||||
|
||||
@@ -302,7 +324,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sndSessionId} S
|
||||
|
||||
checkKeySize :: Monad m' => C.APublicVerifyKey -> m' (Command 'Broker) -> m' BrokerTransmission
|
||||
checkKeySize key action =
|
||||
(sndSessionId,corrId,queueId,)
|
||||
(corrId,queueId,)
|
||||
<$> if C.validKeySize key
|
||||
then action
|
||||
else pure . ERR $ CMD KEY_SIZE
|
||||
@@ -347,7 +369,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sndSessionId} S
|
||||
|
||||
sendMessage :: QueueStore -> MsgBody -> m BrokerTransmission
|
||||
sendMessage st msgBody = do
|
||||
qr <- atomically $ getQueue st SSender queueId
|
||||
qr <- atomically $ getQueue st (CP SSender) queueId
|
||||
either (return . err) storeMessage qr
|
||||
where
|
||||
storeMessage :: QueueRec -> m BrokerTransmission
|
||||
@@ -377,9 +399,9 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sndSessionId} S
|
||||
mapM_ (writeNtf nId) . M.lookup nId =<< readTVar notifiers
|
||||
|
||||
writeNtf :: NotifierId -> Client -> STM ()
|
||||
writeNtf nId Client {sndQ = q, sndSessionId = sessId} =
|
||||
writeNtf nId Client {sndQ = q} =
|
||||
unlessM (isFullTBQueue sndQ) $
|
||||
writeTBQueue q (sessId, CorrId "", nId, NMSG)
|
||||
writeTBQueue q (Just $ CP SNotifier, (CorrId "", nId, NMSG))
|
||||
|
||||
deliverMessage :: (MsgQueue -> STM (Maybe Message)) -> RecipientId -> Sub -> m BrokerTransmission
|
||||
deliverMessage tryPeek rId = \case
|
||||
@@ -389,8 +411,8 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sndSessionId} S
|
||||
q <- atomically $ getMsgQueue ms rId quota
|
||||
atomically (tryPeek q) >>= \case
|
||||
Nothing -> forkSub q $> ok
|
||||
Just msg -> atomically setDelivered $> (sndSessionId, corrId, rId, msgCmd msg)
|
||||
_ -> return ok
|
||||
Just msg -> atomically setDelivered $> (corrId, rId, msgCmd msg)
|
||||
_ -> pure ok
|
||||
where
|
||||
forkSub :: MsgQueue -> m ()
|
||||
forkSub q = do
|
||||
@@ -403,7 +425,7 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sndSessionId} S
|
||||
subscriber :: MsgQueue -> m ()
|
||||
subscriber q = atomically $ do
|
||||
msg <- peekMsg q
|
||||
writeTBQueue sndQ (sndSessionId, CorrId "", rId, msgCmd msg)
|
||||
writeTBQueue sndQ (Just $ CP SRecipient, (CorrId "", rId, msgCmd msg))
|
||||
setSub (\s -> s {subThread = NoSub})
|
||||
void setDelivered
|
||||
|
||||
@@ -422,14 +444,14 @@ client clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sndSessionId} S
|
||||
ms <- asks msgStore
|
||||
atomically $
|
||||
deleteQueue st queueId >>= \case
|
||||
Left e -> return $ err e
|
||||
Left e -> pure $ err e
|
||||
Right _ -> delMsgQueue ms queueId $> ok
|
||||
|
||||
ok :: BrokerTransmission
|
||||
ok = (sndSessionId, corrId, queueId, OK)
|
||||
ok = (corrId, queueId, OK)
|
||||
|
||||
err :: ErrorType -> BrokerTransmission
|
||||
err e = (sndSessionId, corrId, queueId, ERR e)
|
||||
err e = (corrId, queueId, ERR e)
|
||||
|
||||
okResp :: Either ErrorType () -> BrokerTransmission
|
||||
okResp = either err $ const ok
|
||||
|
||||
@@ -59,7 +59,7 @@ data Client = Client
|
||||
{ subscriptions :: TVar (Map RecipientId Sub),
|
||||
ntfSubscriptions :: TVar (Map NotifierId ()),
|
||||
rcvQ :: TBQueue (Transmission ClientCmd),
|
||||
sndQ :: TBQueue BrokerTransmission,
|
||||
sndQ :: TBQueue (Maybe ClientParty, BrokerTransmission),
|
||||
sndSessionId :: SessionId
|
||||
}
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ data QueueStatus = QueueActive | QueueOff deriving (Eq)
|
||||
|
||||
class MonadQueueStore s m where
|
||||
addQueue :: s -> QueueRec -> m (Either ErrorType ())
|
||||
getQueue :: s -> SParty (a :: Party) -> QueueId -> m (Either ErrorType QueueRec)
|
||||
getQueue :: s -> ClientParty -> QueueId -> m (Either ErrorType QueueRec)
|
||||
secureQueue :: s -> RecipientId -> SndPublicVerifyKey -> m (Either ErrorType ())
|
||||
addQueueNotifier :: s -> RecipientId -> NotifierId -> NtfPublicVerifyKey -> m (Either ErrorType ())
|
||||
suspendQueue :: s -> RecipientId -> m (Either ErrorType ())
|
||||
|
||||
@@ -42,14 +42,13 @@ instance MonadQueueStore QueueStore STM where
|
||||
}
|
||||
return $ Right ()
|
||||
|
||||
getQueue :: QueueStore -> SParty (p :: Party) -> QueueId -> STM (Either ErrorType QueueRec)
|
||||
getQueue st party qId = do
|
||||
getQueue :: QueueStore -> ClientParty -> QueueId -> STM (Either ErrorType QueueRec)
|
||||
getQueue st (CP party) qId = do
|
||||
cs <- readTVar st
|
||||
pure $ case party of
|
||||
SRecipient -> getRcpQueue cs qId
|
||||
SSender -> getPartyQueue cs senders
|
||||
SNotifier -> getPartyQueue cs notifiers
|
||||
SBroker -> Left INTERNAL
|
||||
where
|
||||
getPartyQueue ::
|
||||
QueueStoreData ->
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
@@ -171,11 +172,11 @@ smpTest4 _ test' = smpTestN 4 _test
|
||||
_test _ = error "expected 4 handles"
|
||||
|
||||
tPutRaw :: Transport c => THandle c -> SignedRawTransmission -> IO ()
|
||||
tPutRaw h (sig, sessId, corrId, queueId, command) = do
|
||||
tPutRaw h@THandle {sndSessionId = SessionId sessId} (sig, corrId, queueId, command) = do
|
||||
let t = B.unwords [sessId, corrId, queueId, command]
|
||||
void $ tPut h (sig, t)
|
||||
|
||||
tGetRaw :: Transport c => THandle c -> IO SignedRawTransmission
|
||||
tGetRaw h = do
|
||||
(Nothing, _, (SessionId sessId, CorrId corrId, qId, Right cmd)) <- tGet fromServer h
|
||||
pure (Nothing, sessId, corrId, encode qId, serializeCommand cmd)
|
||||
(Nothing, _, (CorrId corrId, qId, Right cmd)) <- tGet fromServer h
|
||||
pure (Nothing, corrId, encode qId, serializeCommand cmd)
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
{-# LANGUAGE DataKinds #-}
|
||||
{-# LANGUAGE GADTs #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE PatternSynonyms #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
@@ -42,14 +41,14 @@ serverTests t = do
|
||||
describe "Message notifications" $ testMessageNotifications t
|
||||
|
||||
pattern Resp :: CorrId -> QueueId -> Command 'Broker -> SignedTransmission (Command 'Broker)
|
||||
pattern Resp corrId queueId command <- ("", _, (_, corrId, queueId, Right command))
|
||||
pattern Resp corrId queueId command <- ("", _, (corrId, queueId, Right command))
|
||||
|
||||
pattern Ids :: RecipientId -> SenderId -> RcvPublicDhKey -> Command 'Broker
|
||||
pattern Ids rId sId srvDh <- IDS (QIK rId _ srvDh sId _)
|
||||
|
||||
sendRecv :: Transport c => THandle c -> (Maybe C.ASignature, ByteString, ByteString, ByteString) -> IO (SignedTransmission (Command 'Broker))
|
||||
sendRecv h@THandle {sndSessionId = SessionId sessId} (sgn, corrId, qId, cmd) =
|
||||
tPutRaw h (sgn, sessId, corrId, encode qId, cmd) >> tGet fromServer h
|
||||
sendRecv h (sgn, corrId, qId, cmd) =
|
||||
tPutRaw h (sgn, corrId, encode qId, cmd) >> tGet fromServer h
|
||||
|
||||
signSendRecv :: Transport c => THandle c -> C.APrivateSignKey -> (ByteString, ByteString, ByteString) -> IO (SignedTransmission (Command 'Broker))
|
||||
signSendRecv h@THandle {sndSessionId = SessionId sessId} pk (corrId, qId, cmd) = do
|
||||
@@ -484,40 +483,40 @@ sampleSig = "gM8qn2Vx3GkhIp2hgrji9uhfXKpgtKDmc0maxdP8GvbORUxMCTlLG8Q/gNcl3pQVOzm
|
||||
|
||||
syntaxTests :: ATransport -> Spec
|
||||
syntaxTests (ATransport t) = do
|
||||
it "unknown command" $ ("", "", "abcd", "1234", "HELLO") >#> ("", "", "abcd", "1234", "ERR CMD SYNTAX")
|
||||
it "unknown command" $ ("", "abcd", "1234", "HELLO") >#> ("", "abcd", "1234", "ERR CMD SYNTAX")
|
||||
describe "NEW" $ do
|
||||
it "no parameters" $ (sampleSig, "", "bcda", "", "NEW") >#> ("", "", "bcda", "", "ERR CMD SYNTAX")
|
||||
it "many parameters" $ (sampleSig, "", "cdab", "", B.unwords ["NEW 1", samplePubKey, sampleDhPubKey]) >#> ("", "", "cdab", "", "ERR CMD SYNTAX")
|
||||
it "no signature" $ ("", "", "dabc", "", B.unwords ["NEW", samplePubKey, sampleDhPubKey]) >#> ("", "", "dabc", "", "ERR CMD NO_AUTH")
|
||||
it "queue ID" $ (sampleSig, "", "abcd", "12345678", B.unwords ["NEW", samplePubKey, sampleDhPubKey]) >#> ("", "", "abcd", "12345678", "ERR CMD HAS_AUTH")
|
||||
it "no parameters" $ (sampleSig, "bcda", "", "NEW") >#> ("", "bcda", "", "ERR CMD SYNTAX")
|
||||
it "many parameters" $ (sampleSig, "cdab", "", B.unwords ["NEW 1", samplePubKey, sampleDhPubKey]) >#> ("", "cdab", "", "ERR CMD SYNTAX")
|
||||
it "no signature" $ ("", "dabc", "", B.unwords ["NEW", samplePubKey, sampleDhPubKey]) >#> ("", "dabc", "", "ERR CMD NO_AUTH")
|
||||
it "queue ID" $ (sampleSig, "abcd", "12345678", B.unwords ["NEW", samplePubKey, sampleDhPubKey]) >#> ("", "abcd", "12345678", "ERR CMD HAS_AUTH")
|
||||
describe "KEY" $ do
|
||||
it "valid syntax" $ (sampleSig, "", "bcda", "12345678", "KEY " <> samplePubKey) >#> ("", "", "bcda", "12345678", "ERR AUTH")
|
||||
it "no parameters" $ (sampleSig, "", "cdab", "12345678", "KEY") >#> ("", "", "cdab", "12345678", "ERR CMD SYNTAX")
|
||||
it "many parameters" $ (sampleSig, "", "dabc", "12345678", "KEY 1 " <> samplePubKey) >#> ("", "", "dabc", "12345678", "ERR CMD SYNTAX")
|
||||
it "no signature" $ ("", "", "abcd", "12345678", "KEY " <> samplePubKey) >#> ("", "", "abcd", "12345678", "ERR CMD NO_AUTH")
|
||||
it "no queue ID" $ (sampleSig, "", "bcda", "", "KEY " <> samplePubKey) >#> ("", "", "bcda", "", "ERR CMD NO_AUTH")
|
||||
it "valid syntax" $ (sampleSig, "bcda", "12345678", "KEY " <> samplePubKey) >#> ("", "bcda", "12345678", "ERR AUTH")
|
||||
it "no parameters" $ (sampleSig, "cdab", "12345678", "KEY") >#> ("", "cdab", "12345678", "ERR CMD SYNTAX")
|
||||
it "many parameters" $ (sampleSig, "dabc", "12345678", "KEY 1 " <> samplePubKey) >#> ("", "dabc", "12345678", "ERR CMD SYNTAX")
|
||||
it "no signature" $ ("", "abcd", "12345678", "KEY " <> samplePubKey) >#> ("", "abcd", "12345678", "ERR CMD NO_AUTH")
|
||||
it "no queue ID" $ (sampleSig, "bcda", "", "KEY " <> samplePubKey) >#> ("", "bcda", "", "ERR CMD NO_AUTH")
|
||||
noParamsSyntaxTest "SUB"
|
||||
noParamsSyntaxTest "ACK"
|
||||
noParamsSyntaxTest "OFF"
|
||||
noParamsSyntaxTest "DEL"
|
||||
describe "SEND" $ do
|
||||
it "valid syntax 1" $ (sampleSig, "", "cdab", "12345678", "SEND 5 hello ") >#> ("", "", "cdab", "12345678", "ERR AUTH")
|
||||
it "valid syntax 2" $ (sampleSig, "", "dabc", "12345678", "SEND 11 hello there ") >#> ("", "", "dabc", "12345678", "ERR AUTH")
|
||||
it "no parameters" $ (sampleSig, "", "abcd", "12345678", "SEND") >#> ("", "", "abcd", "12345678", "ERR CMD SYNTAX")
|
||||
it "no queue ID" $ (sampleSig, "", "bcda", "", "SEND 5 hello ") >#> ("", "", "bcda", "", "ERR CMD NO_QUEUE")
|
||||
it "bad message body 1" $ (sampleSig, "", "cdab", "12345678", "SEND 11 hello ") >#> ("", "", "cdab", "12345678", "ERR CMD SYNTAX")
|
||||
it "bad message body 2" $ (sampleSig, "", "dabc", "12345678", "SEND hello ") >#> ("", "", "dabc", "12345678", "ERR CMD SYNTAX")
|
||||
it "bigger body" $ (sampleSig, "", "abcd", "12345678", "SEND 4 hello ") >#> ("", "", "abcd", "12345678", "ERR CMD SYNTAX")
|
||||
it "valid syntax 1" $ (sampleSig, "cdab", "12345678", "SEND 5 hello ") >#> ("", "cdab", "12345678", "ERR AUTH")
|
||||
it "valid syntax 2" $ (sampleSig, "dabc", "12345678", "SEND 11 hello there ") >#> ("", "dabc", "12345678", "ERR AUTH")
|
||||
it "no parameters" $ (sampleSig, "abcd", "12345678", "SEND") >#> ("", "abcd", "12345678", "ERR CMD SYNTAX")
|
||||
it "no queue ID" $ (sampleSig, "bcda", "", "SEND 5 hello ") >#> ("", "bcda", "", "ERR CMD NO_QUEUE")
|
||||
it "bad message body 1" $ (sampleSig, "cdab", "12345678", "SEND 11 hello ") >#> ("", "cdab", "12345678", "ERR CMD SYNTAX")
|
||||
it "bad message body 2" $ (sampleSig, "dabc", "12345678", "SEND hello ") >#> ("", "dabc", "12345678", "ERR CMD SYNTAX")
|
||||
it "bigger body" $ (sampleSig, "abcd", "12345678", "SEND 4 hello ") >#> ("", "abcd", "12345678", "ERR CMD SYNTAX")
|
||||
describe "PING" $ do
|
||||
it "valid syntax" $ ("", "", "abcd", "", "PING") >#> ("", "", "abcd", "", "PONG")
|
||||
it "valid syntax" $ ("", "abcd", "", "PING") >#> ("", "abcd", "", "PONG")
|
||||
describe "broker response not allowed" $ do
|
||||
it "OK" $ (sampleSig, "", "bcda", "12345678", "OK") >#> ("", "", "bcda", "12345678", "ERR CMD PROHIBITED")
|
||||
it "OK" $ (sampleSig, "bcda", "12345678", "OK") >#> ("", "bcda", "12345678", "ERR CMD PROHIBITED")
|
||||
where
|
||||
noParamsSyntaxTest :: ByteString -> Spec
|
||||
noParamsSyntaxTest cmd = describe (B.unpack cmd) $ do
|
||||
it "valid syntax" $ (sampleSig, "", "abcd", "12345678", cmd) >#> ("", "", "abcd", "12345678", "ERR AUTH")
|
||||
it "wrong terminator" $ (sampleSig, "", "bcda", "12345678", cmd <> "=") >#> ("", "", "bcda", "12345678", "ERR CMD SYNTAX")
|
||||
it "no signature" $ ("", "", "cdab", "12345678", cmd) >#> ("", "", "cdab", "12345678", "ERR CMD NO_AUTH")
|
||||
it "no queue ID" $ (sampleSig, "", "dabc", "", cmd) >#> ("", "", "dabc", "", "ERR CMD NO_AUTH")
|
||||
it "valid syntax" $ (sampleSig, "abcd", "12345678", cmd) >#> ("", "abcd", "12345678", "ERR AUTH")
|
||||
it "wrong terminator" $ (sampleSig, "bcda", "12345678", cmd <> "=") >#> ("", "bcda", "12345678", "ERR CMD SYNTAX")
|
||||
it "no signature" $ ("", "cdab", "12345678", cmd) >#> ("", "cdab", "12345678", "ERR CMD NO_AUTH")
|
||||
it "no queue ID" $ (sampleSig, "dabc", "", cmd) >#> ("", "dabc", "", "ERR CMD NO_AUTH")
|
||||
(>#>) :: SignedRawTransmission -> SignedRawTransmission -> Expectation
|
||||
command >#> response = smpServerTest t command `shouldReturn` response
|
||||
|
||||
Reference in New Issue
Block a user