mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-03-30 14:16:00 +00:00
notifications: protocol, server (#335)
* notifications: protocol * update protocol to include subscription ID and DH public key * update protocol, started server * add notification server subscription DH key * use the same command type in notifications protocol, protocol parsing, server frame * remove empty files
This commit is contained in:
committed by
GitHub
parent
f2409e7280
commit
f466fa76e5
@@ -770,7 +770,7 @@ The syntax for error responses:
|
||||
```abnf
|
||||
error = %s"ERR " errorType
|
||||
errorType = %s"BLOCK" / %s"SESSION" / %s"CMD " cmdError / %s"AUTH" / %s"LARGE_MSG" /%s"INTERNAL"
|
||||
cmdError = %s"SYNTAX" / %s"PROHIBITED" / %s"NO_AUTH" / %s"HAS_AUTH" / %s"NO_QUEUE"
|
||||
cmdError = %s"SYNTAX" / %s"PROHIBITED" / %s"NO_AUTH" / %s"HAS_AUTH" / %s"NO_ENTITY"
|
||||
```
|
||||
|
||||
Server implementations must aim to respond within the same time for each command in all cases when `"ERR AUTH"` response is required to prevent timing attacks (e.g., the server should perform signature verification even when the queue does not exist on the server or the signature of different size is sent, using any RSA key with the same size as the signature size).
|
||||
|
||||
@@ -47,6 +47,13 @@ library
|
||||
Simplex.Messaging.Crypto.Ratchet
|
||||
Simplex.Messaging.Encoding
|
||||
Simplex.Messaging.Encoding.String
|
||||
Simplex.Messaging.Notifications.Client
|
||||
Simplex.Messaging.Notifications.Client.Env
|
||||
Simplex.Messaging.Notifications.Protocol
|
||||
Simplex.Messaging.Notifications.Server
|
||||
Simplex.Messaging.Notifications.Server.Env
|
||||
Simplex.Messaging.Notifications.Server.Subscriptions
|
||||
Simplex.Messaging.Notifications.Transport
|
||||
Simplex.Messaging.Parsers
|
||||
Simplex.Messaging.Protocol
|
||||
Simplex.Messaging.Server
|
||||
|
||||
@@ -539,7 +539,6 @@ data SMPQueueUri = SMPQueueUri
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
-- TODO change SMP queue URI format to include version range and allow unknown parameters
|
||||
instance StrEncoding SMPQueueUri where
|
||||
-- v1 uses short SMP queue URI format
|
||||
strEncode SMPQueueUri {smpServer = srv, senderId = qId, clientVRange = _vr, dhPublicKey = k} =
|
||||
|
||||
@@ -28,6 +28,7 @@ CREATE TABLE ntf_servers (
|
||||
CREATE TABLE ntf_subscriptions (
|
||||
ntf_host TEXT NOT NULL,
|
||||
ntf_port TEXT NOT NULL,
|
||||
ntf_sub_id BLOB NOT NULL,
|
||||
ntf_sub_status TEXT NOT NULL, -- new, created, active, pending, error_auth
|
||||
ntf_sub_action TEXT, -- if there is an action required on this subscription: create / check / token / delete
|
||||
ntf_sub_action_ts TEXT, -- the earliest time for the action, e.g. checks can be scheduled every X hours
|
||||
@@ -37,7 +38,7 @@ CREATE TABLE ntf_subscriptions (
|
||||
smp_ntf_id BLOB NOT NULL,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL, -- this is to check subscription status periodically to know when it was last checked
|
||||
PRIMARY KEY (ntf_host, ntf_port, smp_host, smp_port, smp_ntf_id),
|
||||
PRIMARY KEY (ntf_host, ntf_port, ntf_sub_id),
|
||||
FOREIGN KEY (ntf_host, ntf_port) REFERENCES ntf_servers
|
||||
ON DELETE RESTRICT ON UPDATE CASCADE,
|
||||
FOREIGN KEY (smp_host, smp_port, smp_ntf_id) REFERENCES rcv_queues (host, port, ntf_id)
|
||||
|
||||
@@ -63,8 +63,8 @@ import Network.Socket (ServiceName)
|
||||
import Numeric.Natural
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Protocol
|
||||
import Simplex.Messaging.Transport (ATransport (..), THandle (..), TLS, TProxy, Transport (..), TransportError, clientHandshake)
|
||||
import Simplex.Messaging.Transport.Client (runTransportClient)
|
||||
import Simplex.Messaging.Transport (ATransport (..), THandle (..), TLS, TProxy, Transport (..), TransportError)
|
||||
import Simplex.Messaging.Transport.Client (runTransportClient, smpClientHandshake)
|
||||
import Simplex.Messaging.Transport.KeepAlive
|
||||
import Simplex.Messaging.Transport.WebSockets (WS)
|
||||
import Simplex.Messaging.Util (bshow, liftError, raceAny_)
|
||||
@@ -175,7 +175,7 @@ getSMPClient smpServer cfg@SMPClientConfig {qSize, tcpTimeout, tcpKeepAlive, smp
|
||||
|
||||
client :: forall c. Transport c => TProxy c -> SMPClient -> TMVar (Either SMPClientError (THandle c)) -> c -> IO ()
|
||||
client _ c thVar h =
|
||||
runExceptT (clientHandshake h $ keyHash smpServer) >>= \case
|
||||
runExceptT (smpClientHandshake h $ keyHash smpServer) >>= \case
|
||||
Left e -> atomically . putTMVar thVar . Left $ SMPTransportError e
|
||||
Right th@THandle {sessionId} -> do
|
||||
atomically $ do
|
||||
|
||||
@@ -284,6 +284,12 @@ instance Eq APrivateSignKey where
|
||||
|
||||
deriving instance Show APrivateSignKey
|
||||
|
||||
instance Encoding APrivateSignKey where
|
||||
smpEncode = smpEncode . encodePrivKey
|
||||
{-# INLINE smpEncode #-}
|
||||
smpDecode = decodePrivKey
|
||||
{-# INLINE smpDecode #-}
|
||||
|
||||
data APublicVerifyKey
|
||||
= forall a.
|
||||
(AlgorithmI a, SignatureAlgorithm a) =>
|
||||
|
||||
@@ -141,3 +141,7 @@ instance (Encoding a, Encoding b, Encoding c, Encoding d) => Encoding (a, b, c,
|
||||
instance (Encoding a, Encoding b, Encoding c, Encoding d, Encoding e) => Encoding (a, b, c, d, e) where
|
||||
smpEncode (a, b, c, d, e) = smpEncode a <> smpEncode b <> smpEncode c <> smpEncode d <> smpEncode e
|
||||
smpP = (,,,,) <$> smpP <*> smpP <*> smpP <*> smpP <*> smpP
|
||||
|
||||
instance (Encoding a, Encoding b, Encoding c, Encoding d, Encoding e, Encoding f) => Encoding (a, b, c, d, e, f) where
|
||||
smpEncode (a, b, c, d, e, f) = smpEncode a <> smpEncode b <> smpEncode c <> smpEncode d <> smpEncode e <> smpEncode f
|
||||
smpP = (,,,,,) <$> smpP <*> smpP <*> smpP <*> smpP <*> smpP <*> smpP
|
||||
|
||||
161
src/Simplex/Messaging/Notifications/Protocol.hs
Normal file
161
src/Simplex/Messaging/Notifications/Protocol.hs
Normal file
@@ -0,0 +1,161 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE TypeFamilies #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Protocol where
|
||||
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Maybe (isNothing)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Encoding
|
||||
import Simplex.Messaging.Protocol
|
||||
|
||||
data NtfCommandTag
|
||||
= NCCreate_
|
||||
| NCCheck_
|
||||
| NCToken_
|
||||
| NCDelete_
|
||||
deriving (Show)
|
||||
|
||||
instance Encoding NtfCommandTag where
|
||||
smpEncode = \case
|
||||
NCCreate_ -> "CREATE"
|
||||
NCCheck_ -> "CHECK"
|
||||
NCToken_ -> "TOKEN"
|
||||
NCDelete_ -> "DELETE"
|
||||
smpP = messageTagP
|
||||
|
||||
instance ProtocolMsgTag NtfCommandTag where
|
||||
decodeTag = \case
|
||||
"CREATE" -> Just NCCreate_
|
||||
"CHECK" -> Just NCCheck_
|
||||
"TOKEN" -> Just NCToken_
|
||||
"DELETE" -> Just NCDelete_
|
||||
_ -> Nothing
|
||||
|
||||
data NtfCommand
|
||||
= NCCreate DeviceToken SMPQueueNtfUri C.APublicVerifyKey C.PublicKeyX25519
|
||||
| NCCheck
|
||||
| NCToken DeviceToken
|
||||
| NCDelete
|
||||
|
||||
instance Protocol NtfCommand where
|
||||
type Tag NtfCommand = NtfCommandTag
|
||||
encodeProtocol = \case
|
||||
NCCreate token smpQueue verifyKey dhKey -> e (NCCreate_, ' ', token, smpQueue, verifyKey, dhKey)
|
||||
NCCheck -> e NCCheck_
|
||||
NCToken token -> e (NCToken_, ' ', token)
|
||||
NCDelete -> e NCDelete_
|
||||
where
|
||||
e :: Encoding a => a -> ByteString
|
||||
e = smpEncode
|
||||
|
||||
protocolP = \case
|
||||
NCCreate_ -> NCCreate <$> _smpP <*> smpP <*> smpP <*> smpP
|
||||
NCCheck_ -> pure NCCheck
|
||||
NCToken_ -> NCToken <$> _smpP
|
||||
NCDelete_ -> pure NCDelete
|
||||
|
||||
checkCredentials (sig, _, subId, _) cmd = case cmd of
|
||||
-- CREATE must have signature but NOT subscription ID
|
||||
NCCreate {}
|
||||
| isNothing sig -> Left $ CMD NO_AUTH
|
||||
| not (B.null subId) -> Left $ CMD HAS_AUTH
|
||||
| otherwise -> Right cmd
|
||||
-- other client commands must have both signature and subscription ID
|
||||
_
|
||||
| isNothing sig || B.null subId -> Left $ CMD NO_AUTH
|
||||
| otherwise -> Right cmd
|
||||
|
||||
data NtfResponseTag
|
||||
= NRSubId_
|
||||
| NROk_
|
||||
| NRErr_
|
||||
| NRStat_
|
||||
deriving (Show)
|
||||
|
||||
instance Encoding NtfResponseTag where
|
||||
smpEncode = \case
|
||||
NRSubId_ -> "ID"
|
||||
NROk_ -> "OK"
|
||||
NRErr_ -> "ERR"
|
||||
NRStat_ -> "STAT"
|
||||
smpP = messageTagP
|
||||
|
||||
instance ProtocolMsgTag NtfResponseTag where
|
||||
decodeTag = \case
|
||||
"ID" -> Just NRSubId_
|
||||
"OK" -> Just NROk_
|
||||
"ERR" -> Just NRErr_
|
||||
"STAT" -> Just NRStat_
|
||||
_ -> Nothing
|
||||
|
||||
data NtfResponse
|
||||
= NRSubId C.PublicKeyX25519
|
||||
| NROk
|
||||
| NRErr ErrorType
|
||||
| NRStat NtfStatus
|
||||
|
||||
instance Protocol NtfResponse where
|
||||
type Tag NtfResponse = NtfResponseTag
|
||||
encodeProtocol = \case
|
||||
NRSubId dhKey -> e (NRSubId_, ' ', dhKey)
|
||||
NROk -> e NROk_
|
||||
NRErr err -> e (NRErr_, ' ', err)
|
||||
NRStat stat -> e (NRStat_, ' ', stat)
|
||||
where
|
||||
e :: Encoding a => a -> ByteString
|
||||
e = smpEncode
|
||||
|
||||
protocolP = \case
|
||||
NRSubId_ -> NRSubId <$> _smpP
|
||||
NROk_ -> pure NROk
|
||||
NRErr_ -> NRErr <$> _smpP
|
||||
NRStat_ -> NRStat <$> _smpP
|
||||
|
||||
checkCredentials (_, _, subId, _) cmd = case cmd of
|
||||
-- ERR response does not always have subscription ID
|
||||
NRErr _ -> Right cmd
|
||||
-- other server responses must have subscription ID
|
||||
_
|
||||
| B.null subId -> Left $ CMD NO_ENTITY
|
||||
| otherwise -> Right cmd
|
||||
|
||||
data SMPQueueNtfUri = SMPQueueNtfUri
|
||||
{ smpServer :: SMPServer,
|
||||
notifierId :: NotifierId,
|
||||
notifierKey :: NtfPrivateSignKey
|
||||
}
|
||||
|
||||
instance Encoding SMPQueueNtfUri where
|
||||
smpEncode SMPQueueNtfUri {smpServer, notifierId, notifierKey} = smpEncode (smpServer, notifierId, notifierKey)
|
||||
smpP = do
|
||||
(smpServer, notifierId, notifierKey) <- smpP
|
||||
pure $ SMPQueueNtfUri smpServer notifierId notifierKey
|
||||
|
||||
newtype DeviceToken = DeviceToken ByteString
|
||||
|
||||
instance Encoding DeviceToken where
|
||||
smpEncode (DeviceToken t) = smpEncode t
|
||||
smpP = DeviceToken <$> smpP
|
||||
|
||||
type NtfSubsciptionId = ByteString
|
||||
|
||||
data NtfStatus = NSPending | NSActive | NSEnd | NSSMPAuth
|
||||
|
||||
instance Encoding NtfStatus where
|
||||
smpEncode = \case
|
||||
NSPending -> "PENDING"
|
||||
NSActive -> "ACTIVE"
|
||||
NSEnd -> "END"
|
||||
NSSMPAuth -> "SMP_AUTH"
|
||||
smpP =
|
||||
A.takeTill (== ' ') >>= \case
|
||||
"PENDING" -> pure NSPending
|
||||
"ACTIVE" -> pure NSActive
|
||||
"END" -> pure NSEnd
|
||||
"SMP_AUTH" -> pure NSSMPAuth
|
||||
_ -> fail "bad NtfError"
|
||||
111
src/Simplex/Messaging/Notifications/Server.hs
Normal file
111
src/Simplex/Messaging/Notifications/Server.hs
Normal file
@@ -0,0 +1,111 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE FlexibleContexts #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server where
|
||||
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift (MonadUnliftIO)
|
||||
import Control.Monad.Reader
|
||||
import Crypto.Random (MonadRandom)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Network.Socket (ServiceName)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Notifications.Server.Env
|
||||
import Simplex.Messaging.Notifications.Server.Subscriptions
|
||||
import Simplex.Messaging.Notifications.Transport
|
||||
import Simplex.Messaging.Protocol (ErrorType (..), Transmission, encodeTransmission, tGet, tPut)
|
||||
import Simplex.Messaging.Server
|
||||
import Simplex.Messaging.Transport (ATransport (..), THandle (..), TProxy, Transport)
|
||||
import Simplex.Messaging.Transport.Server (runTransportServer)
|
||||
import Simplex.Messaging.Util
|
||||
import UnliftIO.Exception
|
||||
import UnliftIO.STM
|
||||
|
||||
runNtfServer :: (MonadRandom m, MonadUnliftIO m) => NtfServerConfig -> m ()
|
||||
runNtfServer cfg = do
|
||||
started <- newEmptyTMVarIO
|
||||
runNtfServerBlocking started cfg
|
||||
|
||||
runNtfServerBlocking :: (MonadRandom m, MonadUnliftIO m) => TMVar Bool -> NtfServerConfig -> m ()
|
||||
runNtfServerBlocking started cfg@NtfServerConfig {transports} = do
|
||||
env <- newNtfServerEnv cfg
|
||||
runReaderT ntfServer env
|
||||
where
|
||||
ntfServer :: (MonadUnliftIO m', MonadReader NtfEnv m') => m' ()
|
||||
ntfServer = raceAny_ (map runServer transports)
|
||||
|
||||
runServer :: (MonadUnliftIO m', MonadReader NtfEnv m') => (ServiceName, ATransport) -> m' ()
|
||||
runServer (tcpPort, ATransport t) = do
|
||||
serverParams <- asks tlsServerParams
|
||||
runTransportServer started tcpPort serverParams (runClient t)
|
||||
|
||||
runClient :: (Transport c, MonadUnliftIO m, MonadReader NtfEnv m) => TProxy c -> c -> m ()
|
||||
runClient _ h = do
|
||||
kh <- asks serverIdentity
|
||||
liftIO (runExceptT $ ntfServerHandshake h kh) >>= \case
|
||||
Right th -> runNtfClientTransport th
|
||||
Left _ -> pure ()
|
||||
|
||||
runNtfClientTransport :: (Transport c, MonadUnliftIO m, MonadReader NtfEnv m) => THandle c -> m ()
|
||||
runNtfClientTransport th@THandle {sessionId} = do
|
||||
q <- asks $ tbqSize . config
|
||||
c <- atomically $ newNtfServerClient q sessionId
|
||||
raceAny_ [send th c, client c, receive th c]
|
||||
`finally` clientDisconnected c
|
||||
|
||||
clientDisconnected :: MonadUnliftIO m => NtfServerClient -> m ()
|
||||
clientDisconnected NtfServerClient {connected} = atomically $ writeTVar connected False
|
||||
|
||||
receive :: (Transport c, MonadUnliftIO m, MonadReader NtfEnv m) => THandle c -> NtfServerClient -> m ()
|
||||
receive th NtfServerClient {rcvQ, sndQ} = forever $ do
|
||||
(sig, signed, (corrId, queueId, cmdOrError)) <- tGet th
|
||||
case cmdOrError of
|
||||
Left e -> write sndQ (corrId, queueId, NRErr e)
|
||||
Right cmd -> do
|
||||
verified <- verifyTransmission sig signed queueId cmd
|
||||
if verified
|
||||
then write rcvQ (corrId, queueId, cmd)
|
||||
else write sndQ (corrId, queueId, NRErr AUTH)
|
||||
where
|
||||
write q t = atomically $ writeTBQueue q t
|
||||
|
||||
send :: (Transport c, MonadUnliftIO m) => THandle c -> NtfServerClient -> m ()
|
||||
send h NtfServerClient {sndQ, sessionId} = forever $ do
|
||||
t <- atomically $ readTBQueue sndQ
|
||||
liftIO $ tPut h (Nothing, encodeTransmission sessionId t)
|
||||
|
||||
verifyTransmission ::
|
||||
forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => Maybe C.ASignature -> ByteString -> NtfSubsciptionId -> NtfCommand -> m Bool
|
||||
verifyTransmission sig_ signed subId cmd = do
|
||||
case cmd of
|
||||
NCCreate _ _ k _ -> pure $ verifyCmdSignature sig_ signed k
|
||||
_ -> do
|
||||
st <- asks store
|
||||
verifySubCmd <$> atomically (getNtfSubscription st subId)
|
||||
where
|
||||
verifySubCmd = \case
|
||||
Right sub -> verifyCmdSignature sig_ signed $ subVerifyKey sub
|
||||
Left _ -> maybe False (dummyVerifyCmd signed) sig_ `seq` False
|
||||
|
||||
client :: forall m. (MonadUnliftIO m, MonadReader NtfEnv m) => NtfServerClient -> m ()
|
||||
client NtfServerClient {rcvQ, sndQ} =
|
||||
forever $
|
||||
atomically (readTBQueue rcvQ)
|
||||
>>= processCommand
|
||||
>>= atomically . writeTBQueue sndQ
|
||||
where
|
||||
processCommand :: Transmission NtfCommand -> m (Transmission NtfResponse)
|
||||
processCommand (corrId, subId, cmd) = case cmd of
|
||||
NCCreate _token _smpQueue _verifyKey _dhKey -> do
|
||||
pure (corrId, subId, NROk)
|
||||
NCCheck -> do
|
||||
pure (corrId, subId, NROk)
|
||||
NCToken _token -> do
|
||||
pure (corrId, subId, NROk)
|
||||
NCDelete -> do
|
||||
pure (corrId, subId, NROk)
|
||||
66
src/Simplex/Messaging/Notifications/Server/Env.hs
Normal file
66
src/Simplex/Messaging/Notifications/Server/Env.hs
Normal file
@@ -0,0 +1,66 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
|
||||
module Simplex.Messaging.Notifications.Server.Env where
|
||||
|
||||
import Control.Monad.IO.Unlift
|
||||
import Crypto.Random
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.X509.Validation (Fingerprint (..))
|
||||
import Network.Socket
|
||||
import qualified Network.TLS as T
|
||||
import Numeric.Natural
|
||||
import Simplex.Messaging.Agent.RetryInterval
|
||||
import Simplex.Messaging.Client
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Notifications.Server.Subscriptions
|
||||
import Simplex.Messaging.Protocol (Transmission)
|
||||
import Simplex.Messaging.Transport (ATransport)
|
||||
import Simplex.Messaging.Transport.Server (loadFingerprint, loadTLSServerParams)
|
||||
import UnliftIO.STM
|
||||
|
||||
data NtfServerConfig = NtfServerConfig
|
||||
{ transports :: [(ServiceName, ATransport)],
|
||||
subscriptionIdBytes :: Int,
|
||||
tbqSize :: Natural,
|
||||
smpCfg :: SMPClientConfig,
|
||||
reconnectInterval :: RetryInterval,
|
||||
-- CA certificate private key is not needed for initialization
|
||||
caCertificateFile :: FilePath,
|
||||
privateKeyFile :: FilePath,
|
||||
certificateFile :: FilePath
|
||||
}
|
||||
|
||||
data NtfEnv = NtfEnv
|
||||
{ config :: NtfServerConfig,
|
||||
serverIdentity :: C.KeyHash,
|
||||
store :: NtfSubscriptions,
|
||||
idsDrg :: TVar ChaChaDRG,
|
||||
tlsServerParams :: T.ServerParams,
|
||||
serverIdentity :: C.KeyHash
|
||||
}
|
||||
|
||||
newNtfServerEnv :: (MonadUnliftIO m, MonadRandom m) => NtfServerConfig -> m NtfEnv
|
||||
newNtfServerEnv config@NtfServerConfig {caCertificateFile, certificateFile, privateKeyFile} = do
|
||||
idsDrg <- newTVarIO =<< drgNew
|
||||
store <- newTVarIO M.empty
|
||||
tlsServerParams <- liftIO $ loadTLSServerParams caCertificateFile certificateFile privateKeyFile
|
||||
Fingerprint fp <- liftIO $ loadFingerprint caCertificateFile
|
||||
let serverIdentity = C.KeyHash fp
|
||||
pure NtfEnv {config, store, idsDrg, tlsServerParams, serverIdentity}
|
||||
|
||||
data NtfServerClient = NtfServerClient
|
||||
{ rcvQ :: TBQueue (Transmission NtfCommand),
|
||||
sndQ :: TBQueue (Transmission NtfResponse),
|
||||
sessionId :: ByteString,
|
||||
connected :: TVar Bool
|
||||
}
|
||||
|
||||
newNtfServerClient :: Natural -> ByteString -> STM NtfServerClient
|
||||
newNtfServerClient qSize sessionId = do
|
||||
rcvQ <- newTBQueue qSize
|
||||
sndQ <- newTBQueue qSize
|
||||
connected <- newTVar True
|
||||
return NtfServerClient {rcvQ, sndQ, sessionId, connected}
|
||||
25
src/Simplex/Messaging/Notifications/Server/Subscriptions.hs
Normal file
25
src/Simplex/Messaging/Notifications/Server/Subscriptions.hs
Normal file
@@ -0,0 +1,25 @@
|
||||
module Simplex.Messaging.Notifications.Server.Subscriptions where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Notifications.Protocol
|
||||
import Simplex.Messaging.Protocol (ErrorType (..), NotifierId, NtfPrivateSignKey, SMPServer)
|
||||
|
||||
type NtfSubscriptionsData = Map NtfSubsciptionId NtfSubsciptionRec
|
||||
|
||||
type NtfSubscriptions = TVar NtfSubscriptionsData
|
||||
|
||||
data NtfSubsciptionRec = NtfSubsciptionRec
|
||||
{ smpServer :: SMPServer,
|
||||
notifierId :: NotifierId,
|
||||
notifierKey :: NtfPrivateSignKey,
|
||||
token :: DeviceToken,
|
||||
status :: TVar NtfStatus,
|
||||
subVerifyKey :: C.APublicVerifyKey,
|
||||
subDHSecret :: C.DhSecretX25519
|
||||
}
|
||||
|
||||
getNtfSubscription :: NtfSubscriptions -> NtfSubsciptionId -> STM (Either ErrorType NtfSubsciptionRec)
|
||||
getNtfSubscription st subId = maybe (Left AUTH) Right . M.lookup subId <$> readTVar st
|
||||
19
src/Simplex/Messaging/Notifications/Transport.hs
Normal file
19
src/Simplex/Messaging/Notifications/Transport.hs
Normal file
@@ -0,0 +1,19 @@
|
||||
module Simplex.Messaging.Notifications.Transport where
|
||||
|
||||
import Control.Monad.Except
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Transport
|
||||
|
||||
ntfBlockSize :: Int
|
||||
ntfBlockSize = 512
|
||||
|
||||
-- | Notifcations server transport handshake.
|
||||
ntfServerHandshake :: Transport c => c -> C.KeyHash -> ExceptT TransportError IO (THandle c)
|
||||
ntfServerHandshake c _ = pure $ ntfTHandle c
|
||||
|
||||
-- | Notifcations server client transport handshake.
|
||||
ntfClientHandshake :: Transport c => c -> C.KeyHash -> ExceptT TransportError IO (THandle c)
|
||||
ntfClientHandshake c _ = pure $ ntfTHandle c
|
||||
|
||||
ntfTHandle :: Transport c => c -> THandle c
|
||||
ntfTHandle c = THandle {connection = c, sessionId = tlsUnique c, blockSize = ntfBlockSize, thVersion = 0}
|
||||
@@ -37,7 +37,7 @@ module Simplex.Messaging.Protocol
|
||||
e2eEncMessageLength,
|
||||
|
||||
-- * SMP protocol types
|
||||
Protocol,
|
||||
Protocol (..),
|
||||
Command (..),
|
||||
Party (..),
|
||||
Cmd (..),
|
||||
@@ -74,9 +74,11 @@ module Simplex.Messaging.Protocol
|
||||
MsgBody,
|
||||
|
||||
-- * Parse and serialize
|
||||
ProtocolMsgTag (..),
|
||||
messageTagP,
|
||||
encodeTransmission,
|
||||
transmissionP,
|
||||
encodeProtocol,
|
||||
_smpP,
|
||||
|
||||
-- * TCP transport functions
|
||||
tPut,
|
||||
@@ -174,7 +176,7 @@ data RawTransmission = RawTransmission
|
||||
signed :: ByteString,
|
||||
sessId :: ByteString,
|
||||
corrId :: ByteString,
|
||||
queueId :: ByteString,
|
||||
entityId :: ByteString,
|
||||
command :: ByteString
|
||||
}
|
||||
|
||||
@@ -512,8 +514,8 @@ data CommandError
|
||||
NO_AUTH
|
||||
| -- | transmission has credentials that are not allowed for this command
|
||||
HAS_AUTH
|
||||
| -- | transmission has no required queue ID
|
||||
NO_QUEUE
|
||||
| -- | transmission has no required entity ID (e.g. SMP queue)
|
||||
NO_ENTITY
|
||||
deriving (Eq, Generic, Read, Show)
|
||||
|
||||
instance ToJSON CommandError where
|
||||
@@ -534,9 +536,9 @@ transmissionP = do
|
||||
trn signature signed = do
|
||||
sessId <- smpP
|
||||
corrId <- smpP
|
||||
queueId <- smpP
|
||||
entityId <- smpP
|
||||
command <- A.takeByteString
|
||||
pure RawTransmission {signature, signed, sessId, corrId, queueId, command}
|
||||
pure RawTransmission {signature, signed, sessId, corrId, entityId, command}
|
||||
|
||||
class Protocol msg where
|
||||
type Tag msg
|
||||
@@ -571,7 +573,7 @@ instance PartyI p => Protocol (Command p) where
|
||||
| otherwise -> Right cmd
|
||||
-- SEND must have queue ID, signature is not always required
|
||||
SEND _
|
||||
| B.null queueId -> Left $ CMD NO_QUEUE
|
||||
| B.null queueId -> Left $ CMD NO_ENTITY
|
||||
| otherwise -> Right cmd
|
||||
-- PING must not have queue ID or signature
|
||||
PING
|
||||
@@ -640,7 +642,7 @@ instance Protocol BrokerMsg where
|
||||
| otherwise -> Left $ CMD HAS_AUTH
|
||||
-- other broker responses must have queue ID
|
||||
_
|
||||
| B.null queueId -> Left $ CMD NO_QUEUE
|
||||
| B.null queueId -> Left $ CMD NO_ENTITY
|
||||
| otherwise -> Right cmd
|
||||
|
||||
_smpP :: Encoding a => Parser a
|
||||
@@ -695,14 +697,15 @@ instance Encoding CommandError where
|
||||
SYNTAX -> "SYNTAX"
|
||||
NO_AUTH -> "NO_AUTH"
|
||||
HAS_AUTH -> "HAS_AUTH"
|
||||
NO_QUEUE -> "NO_QUEUE"
|
||||
NO_ENTITY -> "NO_ENTITY"
|
||||
smpP =
|
||||
A.takeTill (== ' ') >>= \case
|
||||
"UNKNOWN" -> pure UNKNOWN
|
||||
"SYNTAX" -> pure SYNTAX
|
||||
"NO_AUTH" -> pure NO_AUTH
|
||||
"HAS_AUTH" -> pure HAS_AUTH
|
||||
"NO_QUEUE" -> pure NO_QUEUE
|
||||
"NO_ENTITY" -> pure NO_ENTITY
|
||||
"NO_QUEUE" -> pure NO_ENTITY
|
||||
_ -> fail "bad command error type"
|
||||
|
||||
-- | Send signed SMP transmission to TCP transport.
|
||||
@@ -727,9 +730,9 @@ tGet th@THandle {sessionId} = liftIO (tGetParse th) >>= decodeParseValidate
|
||||
where
|
||||
decodeParseValidate :: Either TransportError RawTransmission -> m (SignedTransmission cmd)
|
||||
decodeParseValidate = \case
|
||||
Right RawTransmission {signature, signed, sessId, corrId, queueId, command}
|
||||
Right RawTransmission {signature, signed, sessId, corrId, entityId, command}
|
||||
| sessId == sessionId ->
|
||||
let decodedTransmission = (,corrId,queueId,command) <$> C.decodeSignature signature
|
||||
let decodedTransmission = (,corrId,entityId,command) <$> C.decodeSignature signature
|
||||
in either (const $ tError corrId) (tParseValidate signed) decodedTransmission
|
||||
| otherwise -> pure (Nothing, "", (CorrId corrId, "", Left SESSION))
|
||||
Left _ -> tError ""
|
||||
@@ -738,6 +741,6 @@ tGet th@THandle {sessionId} = liftIO (tGetParse th) >>= decodeParseValidate
|
||||
tError corrId = pure (Nothing, "", (CorrId corrId, "", Left BLOCK))
|
||||
|
||||
tParseValidate :: ByteString -> SignedRawTransmission -> m (SignedTransmission cmd)
|
||||
tParseValidate signed t@(sig, corrId, queueId, command) = do
|
||||
tParseValidate signed t@(sig, corrId, entityId, command) = do
|
||||
let cmd = parseProtocol command >>= checkCredentials t
|
||||
pure (sig, signed, (CorrId corrId, queueId, cmd))
|
||||
pure (sig, signed, (CorrId corrId, entityId, cmd))
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
-- and optional append only log of SMP queue records.
|
||||
--
|
||||
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md
|
||||
module Simplex.Messaging.Server (runSMPServer, runSMPServerBlocking) where
|
||||
module Simplex.Messaging.Server (runSMPServer, runSMPServerBlocking, verifyCmdSignature, dummyVerifyCmd) where
|
||||
|
||||
import Control.Concurrent.STM (stateTVar)
|
||||
import Control.Monad
|
||||
@@ -121,7 +121,7 @@ runSMPServerBlocking started cfg@ServerConfig {transports} = do
|
||||
runClient :: (Transport c, MonadUnliftIO m, MonadReader Env m) => TProxy c -> c -> m ()
|
||||
runClient _ h = do
|
||||
kh <- asks serverIdentity
|
||||
liftIO (runExceptT $ serverHandshake h kh) >>= \case
|
||||
liftIO (runExceptT $ smpServerHandshake h kh) >>= \case
|
||||
Right th -> runClientTransport th
|
||||
Left _ -> pure ()
|
||||
|
||||
@@ -176,8 +176,8 @@ verifyTransmission ::
|
||||
forall m. (MonadUnliftIO m, MonadReader Env m) => Maybe C.ASignature -> ByteString -> QueueId -> Cmd -> m Bool
|
||||
verifyTransmission sig_ signed queueId cmd = do
|
||||
case cmd of
|
||||
Cmd SRecipient (NEW k _) -> pure $ verifySignature k
|
||||
Cmd SRecipient _ -> verifyCmd SRecipient $ verifySignature . recipientKey
|
||||
Cmd SRecipient (NEW k _) -> pure $ verifyCmdSignature sig_ signed k
|
||||
Cmd SRecipient _ -> verifyCmd SRecipient $ verifyCmdSignature sig_ signed . recipientKey
|
||||
Cmd SSender (SEND _) -> verifyCmd SSender $ verifyMaybe . senderKey
|
||||
Cmd SSender PING -> pure True
|
||||
Cmd SNotifier NSUB -> verifyCmd SNotifier $ verifyMaybe . fmap snd . notifier
|
||||
@@ -186,18 +186,21 @@ verifyTransmission sig_ signed queueId cmd = do
|
||||
verifyCmd party f = do
|
||||
st <- asks queueStore
|
||||
q <- atomically $ getQueue st party queueId
|
||||
pure $ either (const $ maybe False dummyVerify sig_ `seq` False) f q
|
||||
pure $ either (const $ maybe False (dummyVerifyCmd signed) sig_ `seq` False) f q
|
||||
verifyMaybe :: Maybe C.APublicVerifyKey -> Bool
|
||||
verifyMaybe = maybe (isNothing sig_) verifySignature
|
||||
verifySignature :: C.APublicVerifyKey -> Bool
|
||||
verifySignature key = maybe False (verify key) sig_
|
||||
verifyMaybe = maybe (isNothing sig_) $ verifyCmdSignature sig_ signed
|
||||
|
||||
verifyCmdSignature :: Maybe C.ASignature -> ByteString -> C.APublicVerifyKey -> Bool
|
||||
verifyCmdSignature sig_ signed key = maybe False (verify key) sig_
|
||||
where
|
||||
verify :: C.APublicVerifyKey -> C.ASignature -> Bool
|
||||
verify (C.APublicVerifyKey a k) sig@(C.ASignature a' s) =
|
||||
case (testEquality a a', C.signatureSize k == C.signatureSize s) of
|
||||
(Just Refl, True) -> C.verify' k s signed
|
||||
_ -> dummyVerify sig `seq` False
|
||||
dummyVerify :: C.ASignature -> Bool
|
||||
dummyVerify (C.ASignature _ s) = C.verify' (dummyPublicKey s) s signed
|
||||
_ -> dummyVerifyCmd signed sig `seq` False
|
||||
|
||||
dummyVerifyCmd :: ByteString -> C.ASignature -> Bool
|
||||
dummyVerifyCmd signed (C.ASignature _ s) = C.verify' (dummyPublicKey s) s signed
|
||||
|
||||
-- These dummy keys are used with `dummyVerify` function to mitigate timing attacks
|
||||
-- by having the same time of the response whether a queue exists or nor, for all valid key/signature sizes
|
||||
|
||||
@@ -26,7 +26,6 @@
|
||||
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#appendix-a
|
||||
module Simplex.Messaging.Transport
|
||||
( -- * SMP transport parameters
|
||||
smpBlockSize,
|
||||
supportedSMPVersions,
|
||||
simplexMQVersion,
|
||||
|
||||
@@ -46,8 +45,8 @@ module Simplex.Messaging.Transport
|
||||
-- * SMP transport
|
||||
THandle (..),
|
||||
TransportError (..),
|
||||
serverHandshake,
|
||||
clientHandshake,
|
||||
smpServerHandshake,
|
||||
smpClientHandshake,
|
||||
tPutBlock,
|
||||
tGetBlock,
|
||||
serializeTransportError,
|
||||
@@ -251,8 +250,9 @@ trimCR s = if B.last s == '\r' then B.init s else s
|
||||
data THandle c = THandle
|
||||
{ connection :: c,
|
||||
sessionId :: ByteString,
|
||||
-- | agreed SMP server protocol version
|
||||
smpVersion :: Version
|
||||
blockSize :: Int,
|
||||
-- | agreed server protocol version
|
||||
thVersion :: Version
|
||||
}
|
||||
|
||||
data ServerHandshake = ServerHandshake
|
||||
@@ -332,45 +332,45 @@ serializeTransportError = \case
|
||||
|
||||
-- | Pad and send block to SMP transport.
|
||||
tPutBlock :: Transport c => THandle c -> ByteString -> IO (Either TransportError ())
|
||||
tPutBlock THandle {connection = c} block =
|
||||
tPutBlock THandle {connection = c, blockSize} block =
|
||||
bimapM (const $ pure TELargeMsg) (cPut c) $
|
||||
C.pad block smpBlockSize
|
||||
C.pad block blockSize
|
||||
|
||||
-- | Receive block from SMP transport.
|
||||
tGetBlock :: Transport c => THandle c -> IO (Either TransportError ByteString)
|
||||
tGetBlock THandle {connection = c} =
|
||||
cGet c smpBlockSize >>= \case
|
||||
tGetBlock THandle {connection = c, blockSize} =
|
||||
cGet c blockSize >>= \case
|
||||
"" -> ioe_EOF
|
||||
msg -> pure . first (const TELargeMsg) $ C.unPad msg
|
||||
|
||||
-- | Server SMP transport handshake.
|
||||
--
|
||||
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#appendix-a
|
||||
serverHandshake :: forall c. Transport c => c -> C.KeyHash -> ExceptT TransportError IO (THandle c)
|
||||
serverHandshake c kh = do
|
||||
let th@THandle {sessionId} = tHandle c
|
||||
smpServerHandshake :: forall c. Transport c => c -> C.KeyHash -> ExceptT TransportError IO (THandle c)
|
||||
smpServerHandshake c kh = do
|
||||
let th@THandle {sessionId} = smpTHandle c
|
||||
sendHandshake th $ ServerHandshake {sessionId, smpVersionRange = supportedSMPVersions}
|
||||
getHandshake th >>= \case
|
||||
ClientHandshake {smpVersion, keyHash}
|
||||
| keyHash /= kh ->
|
||||
throwE $ TEHandshake IDENTITY
|
||||
| smpVersion `isCompatible` supportedSMPVersions -> do
|
||||
pure (th :: THandle c) {smpVersion}
|
||||
pure (th :: THandle c) {thVersion = smpVersion}
|
||||
| otherwise -> throwE $ TEHandshake VERSION
|
||||
|
||||
-- | Client SMP transport handshake.
|
||||
--
|
||||
-- See https://github.com/simplex-chat/simplexmq/blob/master/protocol/simplex-messaging.md#appendix-a
|
||||
clientHandshake :: forall c. Transport c => c -> C.KeyHash -> ExceptT TransportError IO (THandle c)
|
||||
clientHandshake c keyHash = do
|
||||
let th@THandle {sessionId} = tHandle c
|
||||
smpClientHandshake :: forall c. Transport c => c -> C.KeyHash -> ExceptT TransportError IO (THandle c)
|
||||
smpClientHandshake c keyHash = do
|
||||
let th@THandle {sessionId} = smpTHandle c
|
||||
ServerHandshake {sessionId = sessId, smpVersionRange} <- getHandshake th
|
||||
if sessionId /= sessId
|
||||
then throwE TEBadSession
|
||||
else case smpVersionRange `compatibleVersion` supportedSMPVersions of
|
||||
Just (Compatible smpVersion) -> do
|
||||
sendHandshake th $ ClientHandshake {smpVersion, keyHash}
|
||||
pure (th :: THandle c) {smpVersion}
|
||||
pure (th :: THandle c) {thVersion = smpVersion}
|
||||
Nothing -> throwE $ TEHandshake VERSION
|
||||
|
||||
sendHandshake :: (Transport c, Encoding smp) => THandle c -> smp -> ExceptT TransportError IO ()
|
||||
@@ -379,6 +379,5 @@ sendHandshake th = ExceptT . tPutBlock th . smpEncode
|
||||
getHandshake :: (Transport c, Encoding smp) => THandle c -> ExceptT TransportError IO smp
|
||||
getHandshake th = ExceptT $ (parse smpP (TEHandshake PARSE) =<<) <$> tGetBlock th
|
||||
|
||||
tHandle :: Transport c => c -> THandle c
|
||||
tHandle c =
|
||||
THandle {connection = c, sessionId = tlsUnique c, smpVersion = 0}
|
||||
smpTHandle :: Transport c => c -> THandle c
|
||||
smpTHandle c = THandle {connection = c, sessionId = tlsUnique c, blockSize = smpBlockSize, thVersion = 0}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
module Simplex.Messaging.Transport.Client
|
||||
( runTransportClient,
|
||||
clientHandshake,
|
||||
smpClientHandshake,
|
||||
)
|
||||
where
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ module Simplex.Messaging.Transport.Server
|
||||
( runTransportServer,
|
||||
loadTLSServerParams,
|
||||
loadFingerprint,
|
||||
serverHandshake,
|
||||
smpServerHandshake,
|
||||
)
|
||||
where
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ testStoreLogFile = "tests/tmp/smp-server-store.log"
|
||||
testSMPClient :: (Transport c, MonadUnliftIO m) => (THandle c -> m a) -> m a
|
||||
testSMPClient client =
|
||||
runTransportClient testHost testPort testKeyHash (Just defaultKeepAliveOpts) $ \h ->
|
||||
liftIO (runExceptT $ clientHandshake h testKeyHash) >>= \case
|
||||
liftIO (runExceptT $ smpClientHandshake h testKeyHash) >>= \case
|
||||
Right th -> client th
|
||||
Left e -> error $ show e
|
||||
|
||||
|
||||
@@ -496,7 +496,7 @@ syntaxTests (ATransport t) = do
|
||||
describe "SEND" $ do
|
||||
it "valid syntax" $ (sampleSig, "cdab", "12345678", (SEND_, ' ', "hello" :: ByteString)) >#> ("", "cdab", "12345678", ERR AUTH)
|
||||
it "no parameters" $ (sampleSig, "abcd", "12345678", SEND_) >#> ("", "abcd", "12345678", ERR $ CMD SYNTAX)
|
||||
it "no queue ID" $ (sampleSig, "bcda", "", (SEND_, ' ', "hello" :: ByteString)) >#> ("", "bcda", "", ERR $ CMD NO_QUEUE)
|
||||
it "no queue ID" $ (sampleSig, "bcda", "", (SEND_, ' ', "hello" :: ByteString)) >#> ("", "bcda", "", ERR $ CMD NO_ENTITY)
|
||||
describe "PING" $ do
|
||||
it "valid syntax" $ ("", "abcd", "", PING_) >#> ("", "abcd", "", PONG)
|
||||
describe "broker response not allowed" $ do
|
||||
|
||||
Reference in New Issue
Block a user