mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-07-03 19:52:48 +00:00
smp-server: namespaces resolver scaffolding
This commit is contained in:
@@ -15,6 +15,7 @@ module Simplex.Messaging.Encoding
|
||||
smpEncodeList,
|
||||
smpListP,
|
||||
lenEncode,
|
||||
lenP,
|
||||
)
|
||||
where
|
||||
|
||||
|
||||
@@ -163,6 +163,18 @@ module Simplex.Messaging.Protocol
|
||||
EncTransmission (..),
|
||||
FwdResponse (..),
|
||||
FwdTransmission (..),
|
||||
LookupKey (..),
|
||||
unLookupKey,
|
||||
NameRecord (..),
|
||||
NameOwner,
|
||||
mkNameOwner,
|
||||
unNameOwner,
|
||||
NameLink,
|
||||
mkNameLink,
|
||||
unNameLink,
|
||||
nameRecBytes,
|
||||
parseNameRec,
|
||||
smpListPUpTo,
|
||||
MsgFlags (..),
|
||||
initialSMPClientVersion,
|
||||
currentSMPClientVersion,
|
||||
@@ -225,6 +237,7 @@ where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import Control.Exception (Exception, SomeException, displayException, fromException)
|
||||
import Control.Monad (when)
|
||||
import Control.Monad.Except
|
||||
import Data.Aeson (FromJSON (..), ToJSON (..))
|
||||
import qualified Data.Aeson as J
|
||||
@@ -250,7 +263,7 @@ import Data.Maybe (isJust, isNothing)
|
||||
import Data.String
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
|
||||
import Data.Text.Encoding (decodeLatin1, decodeUtf8', encodeUtf8)
|
||||
import Data.Time.Clock.System (SystemTime (..), systemToUTCTime)
|
||||
import Data.Type.Equality
|
||||
import Data.Word (Word8, Word16)
|
||||
@@ -343,6 +356,7 @@ data Party
|
||||
| LinkClient
|
||||
| ProxiedClient
|
||||
| ProxyService
|
||||
| Resolver
|
||||
deriving (Show)
|
||||
|
||||
-- | Singleton types for SMP protocol clients
|
||||
@@ -357,6 +371,7 @@ data SParty :: Party -> Type where
|
||||
SSenderLink :: SParty LinkClient
|
||||
SProxiedClient :: SParty ProxiedClient
|
||||
SProxyService :: SParty ProxyService
|
||||
SResolver :: SParty Resolver
|
||||
|
||||
instance TestEquality SParty where
|
||||
testEquality SCreator SCreator = Just Refl
|
||||
@@ -369,6 +384,7 @@ instance TestEquality SParty where
|
||||
testEquality SSenderLink SSenderLink = Just Refl
|
||||
testEquality SProxiedClient SProxiedClient = Just Refl
|
||||
testEquality SProxyService SProxyService = Just Refl
|
||||
testEquality SResolver SResolver = Just Refl
|
||||
testEquality _ _ = Nothing
|
||||
|
||||
deriving instance Show (SParty p)
|
||||
@@ -395,6 +411,8 @@ instance PartyI ProxiedClient where sParty = SProxiedClient
|
||||
|
||||
instance PartyI ProxyService where sParty = SProxyService
|
||||
|
||||
instance PartyI Resolver where sParty = SResolver
|
||||
|
||||
-- command parties that can read queues
|
||||
type family QueueParty (p :: Party) :: Constraint where
|
||||
QueueParty Recipient = ()
|
||||
@@ -473,6 +491,7 @@ partyClientRole = \case
|
||||
SSenderLink -> Just SRMessaging
|
||||
SProxiedClient -> Just SRMessaging
|
||||
SProxyService -> Just SRProxy
|
||||
SResolver -> Nothing
|
||||
{-# INLINE partyClientRole #-}
|
||||
|
||||
partyServiceRole :: ServiceParty p => SParty p -> SMPServiceRole
|
||||
@@ -550,6 +569,21 @@ type LinkId = QueueId
|
||||
-- | SMP queue ID on the server.
|
||||
type QueueId = EntityId
|
||||
|
||||
-- | Name lookup key — opaque bytes; namespace/casing per RFC enforced client-side.
|
||||
newtype LookupKey = LookupKey ByteString
|
||||
deriving (Eq, Show)
|
||||
|
||||
unLookupKey :: LookupKey -> ByteString
|
||||
unLookupKey (LookupKey s) = s
|
||||
{-# INLINE unLookupKey #-}
|
||||
|
||||
instance Encoding LookupKey where
|
||||
smpEncode (LookupKey s) = smpEncode s
|
||||
smpP = do
|
||||
n <- lenP
|
||||
when (n > 64) $ fail "LookupKey too long"
|
||||
LookupKey <$> A.take n
|
||||
|
||||
-- | Parameterized type for SMP protocol commands from all clients.
|
||||
data Command (p :: Party) where
|
||||
-- SMP recipient commands
|
||||
@@ -597,6 +631,8 @@ data Command (p :: Party) where
|
||||
-- - entity ID: empty
|
||||
-- - corrId: unique correlation ID between proxy and relay, also used as a nonce to encrypt forwarded transmission
|
||||
RFWD :: EncFwdTransmission -> Command ProxyService -- use CorrId as CbNonce, proxy to relay
|
||||
-- Name resolution: forwarded-only via PFWD. Server reads SNRC contract via Ethereum JSON-RPC.
|
||||
RSLV :: LookupKey -> Command Resolver
|
||||
|
||||
deriving instance Show (Command p)
|
||||
|
||||
@@ -705,6 +741,96 @@ instance Encoding FwdTransmission where
|
||||
newtype EncFwdTransmission = EncFwdTransmission ByteString
|
||||
deriving (Show)
|
||||
|
||||
-- | 20-byte Ethereum address (NameRecord owner). Bare constructor not exported;
|
||||
-- use `mkNameOwner` to enforce the 20-byte invariant.
|
||||
newtype NameOwner = NameOwner ByteString
|
||||
deriving (Eq, Show)
|
||||
|
||||
mkNameOwner :: ByteString -> Either String NameOwner
|
||||
mkNameOwner bs
|
||||
| B.length bs == 20 = Right (NameOwner bs)
|
||||
| otherwise = Left "NameOwner must be 20 bytes"
|
||||
|
||||
unNameOwner :: NameOwner -> ByteString
|
||||
unNameOwner (NameOwner bs) = bs
|
||||
{-# INLINE unNameOwner #-}
|
||||
|
||||
instance Encoding NameOwner where
|
||||
smpEncode (NameOwner bs) = bs
|
||||
{-# INLINE smpEncode #-}
|
||||
smpP = NameOwner <$> A.take 20
|
||||
|
||||
-- | A name-record link (channel or contact). Bare constructor not exported;
|
||||
-- use `mkNameLink` to enforce the ≤1024-byte UTF-8 invariant.
|
||||
newtype NameLink = NameLink Text
|
||||
deriving (Eq, Show)
|
||||
|
||||
mkNameLink :: Text -> Either String NameLink
|
||||
mkNameLink t
|
||||
| B.length (encodeUtf8 t) <= 1024 = Right (NameLink t)
|
||||
| otherwise = Left "NameLink too long"
|
||||
|
||||
unNameLink :: NameLink -> Text
|
||||
unNameLink (NameLink t) = t
|
||||
{-# INLINE unNameLink #-}
|
||||
|
||||
instance Encoding NameLink where
|
||||
smpEncode (NameLink t) =
|
||||
let bs = encodeUtf8 t
|
||||
in smpEncode @Word16 (fromIntegral $ B.length bs) <> bs
|
||||
smpP = do
|
||||
n <- fromIntegral <$> smpP @Word16
|
||||
when (n > 1024) $ fail "NameLink too long"
|
||||
bs <- A.take n
|
||||
either (fail . show) (pure . NameLink) (decodeUtf8' bs)
|
||||
|
||||
-- | Resolved name record returned by the names role.
|
||||
-- Field additions are gated by future SMP version bumps (matching IDS QIK precedent).
|
||||
data NameRecord = NameRecord
|
||||
{ nrDisplayName :: Text, -- ≤255 bytes UTF-8 (enforced by Encoding ByteString length prefix)
|
||||
nrOwner :: NameOwner,
|
||||
nrChannelLinks :: [NameLink],
|
||||
nrContactLinks :: [NameLink],
|
||||
nrAdminAddress :: Maybe Text,
|
||||
nrAdminEmail :: Maybe Text,
|
||||
nrExpiry :: Int64, -- Unix seconds, ≥ 0
|
||||
nrIsTest :: Bool
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
-- | Bounded list parser — caps element count before allocating.
|
||||
smpListPUpTo :: Encoding a => Int -> Parser [a]
|
||||
smpListPUpTo cap = do
|
||||
n <- lenP
|
||||
when (n > cap) $ fail "list too long"
|
||||
A.count n smpP
|
||||
|
||||
-- | Encode NameRecord on the wire. Version-branched in the same shape as IDS QIK.
|
||||
nameRecBytes :: VersionSMP -> NameRecord -> ByteString
|
||||
nameRecBytes _v NameRecord {nrDisplayName, nrOwner, nrChannelLinks, nrContactLinks, nrAdminAddress, nrAdminEmail, nrExpiry, nrIsTest} =
|
||||
smpEncode nrDisplayName
|
||||
<> smpEncode nrOwner
|
||||
<> smpEncodeList nrChannelLinks
|
||||
<> smpEncodeList nrContactLinks
|
||||
<> smpEncode nrAdminAddress
|
||||
<> smpEncode nrAdminEmail
|
||||
<> smpEncode nrExpiry
|
||||
<> smpEncode nrIsTest
|
||||
|
||||
-- | Parse NameRecord. Combined channel+contact list cap is 8.
|
||||
parseNameRec :: VersionSMP -> Parser NameRecord
|
||||
parseNameRec _v = do
|
||||
nrDisplayName <- smpP
|
||||
nrOwner <- smpP
|
||||
nrChannelLinks <- smpListPUpTo 8
|
||||
nrContactLinks <- smpListPUpTo (8 - length nrChannelLinks)
|
||||
nrAdminAddress <- smpP
|
||||
nrAdminEmail <- smpP
|
||||
nrExpiry <- smpP
|
||||
when (nrExpiry < 0) $ fail "expiry must be non-negative"
|
||||
nrIsTest <- smpP
|
||||
pure NameRecord {nrDisplayName, nrOwner, nrChannelLinks, nrContactLinks, nrAdminAddress, nrAdminEmail, nrExpiry, nrIsTest}
|
||||
|
||||
data BrokerMsg where
|
||||
-- SMP broker messages (responses, client messages, notifications)
|
||||
IDS :: QueueIdsKeys -> BrokerMsg
|
||||
@@ -732,6 +858,8 @@ data BrokerMsg where
|
||||
OK :: BrokerMsg
|
||||
ERR :: ErrorType -> BrokerMsg
|
||||
PONG :: BrokerMsg
|
||||
-- Name resolution response. Returned only for forwarded RSLV.
|
||||
NAME :: NameRecord -> BrokerMsg
|
||||
deriving (Eq, Show)
|
||||
|
||||
data RcvMessage = RcvMessage
|
||||
@@ -942,6 +1070,7 @@ data CommandTag (p :: Party) where
|
||||
RFWD_ :: CommandTag ProxyService
|
||||
NSUB_ :: CommandTag Notifier
|
||||
NSUBS_ :: CommandTag NotifierService
|
||||
RSLV_ :: CommandTag Resolver
|
||||
|
||||
data CmdTag = forall p. PartyI p => CT (SParty p) (CommandTag p)
|
||||
|
||||
@@ -968,6 +1097,7 @@ data BrokerMsgTag
|
||||
| OK_
|
||||
| ERR_
|
||||
| PONG_
|
||||
| NAME_
|
||||
deriving (Show)
|
||||
|
||||
class ProtocolMsgTag t where
|
||||
@@ -1004,6 +1134,7 @@ instance PartyI p => Encoding (CommandTag p) where
|
||||
RFWD_ -> "RFWD"
|
||||
NSUB_ -> "NSUB"
|
||||
NSUBS_ -> "NSUBS"
|
||||
RSLV_ -> "RSLV"
|
||||
smpP = messageTagP
|
||||
|
||||
instance ProtocolMsgTag CmdTag where
|
||||
@@ -1032,6 +1163,7 @@ instance ProtocolMsgTag CmdTag where
|
||||
"RFWD" -> Just $ CT SProxyService RFWD_
|
||||
"NSUB" -> Just $ CT SNotifier NSUB_
|
||||
"NSUBS" -> Just $ CT SNotifierService NSUBS_
|
||||
"RSLV" -> Just $ CT SResolver RSLV_
|
||||
_ -> Nothing
|
||||
|
||||
instance Encoding CmdTag where
|
||||
@@ -1061,6 +1193,7 @@ instance Encoding BrokerMsgTag where
|
||||
OK_ -> "OK"
|
||||
ERR_ -> "ERR"
|
||||
PONG_ -> "PONG"
|
||||
NAME_ -> "NAME"
|
||||
smpP = messageTagP
|
||||
|
||||
instance ProtocolMsgTag BrokerMsgTag where
|
||||
@@ -1083,6 +1216,7 @@ instance ProtocolMsgTag BrokerMsgTag where
|
||||
"OK" -> Just OK_
|
||||
"ERR" -> Just ERR_
|
||||
"PONG" -> Just PONG_
|
||||
"NAME" -> Just NAME_
|
||||
_ -> Nothing
|
||||
|
||||
-- | SMP message body format
|
||||
@@ -1792,6 +1926,7 @@ instance PartyI p => ProtocolEncoding SMPVersion ErrorType (Command p) where
|
||||
PRXY host auth_ -> e (PRXY_, ' ', host, auth_)
|
||||
PFWD fwdV pubKey (EncTransmission s) -> e (PFWD_, ' ', fwdV, pubKey, Tail s)
|
||||
RFWD (EncFwdTransmission s) -> e (RFWD_, ' ', Tail s)
|
||||
RSLV key -> e (RSLV_, ' ', key)
|
||||
where
|
||||
e :: Encoding a => a -> ByteString
|
||||
e = smpEncode
|
||||
@@ -1816,6 +1951,7 @@ instance PartyI p => ProtocolEncoding SMPVersion ErrorType (Command p) where
|
||||
PRXY {} -> noAuthCmd
|
||||
PFWD {} -> entityCmd
|
||||
RFWD _ -> noAuthCmd
|
||||
RSLV _ -> noAuthCmd
|
||||
SUB -> serviceCmd
|
||||
NSUB -> serviceCmd
|
||||
-- other client commands must have both signature and queue ID
|
||||
@@ -1899,6 +2035,9 @@ instance ProtocolEncoding SMPVersion ErrorType Cmd where
|
||||
CT SNotifierService NSUBS_
|
||||
| v >= rcvServiceSMPVersion -> Cmd SNotifierService <$> (NSUBS <$> _smpP <*> smpP)
|
||||
| otherwise -> pure $ Cmd SNotifierService $ NSUBS (-1) mempty
|
||||
CT SResolver RSLV_
|
||||
| v >= namesSMPVersion -> Cmd SResolver . RSLV <$> _smpP
|
||||
| otherwise -> fail "RSLV requires namesSMPVersion"
|
||||
|
||||
fromProtocolError = fromProtocolError @SMPVersion @ErrorType @BrokerMsg
|
||||
{-# INLINE fromProtocolError #-}
|
||||
@@ -1945,6 +2084,9 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
|
||||
| v < clientNoticesSMPVersion -> BLOCKED info {notice = Nothing}
|
||||
_ -> err
|
||||
PONG -> e PONG_
|
||||
NAME rec
|
||||
| v >= namesSMPVersion -> e (NAME_, ' ') <> nameRecBytes v rec
|
||||
| otherwise -> e (ERR_, ' ', AUTH) -- pre-v20: shouldn't reach here, degrade to AUTH
|
||||
where
|
||||
e :: Encoding a => a -> ByteString
|
||||
e = smpEncode
|
||||
@@ -1992,6 +2134,9 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
|
||||
OK_ -> pure OK
|
||||
ERR_ -> ERR <$> _smpP
|
||||
PONG_ -> pure PONG
|
||||
NAME_
|
||||
| v >= namesSMPVersion -> NAME <$> (A.space *> parseNameRec v)
|
||||
| otherwise -> fail "NAME requires namesSMPVersion"
|
||||
where
|
||||
serviceRespP resp
|
||||
| v >= rcvServiceSMPVersion = resp <$> _smpP <*> smpP
|
||||
@@ -2014,6 +2159,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
|
||||
PKEY {} -> noEntityMsg
|
||||
RRES _ -> noEntityMsg
|
||||
ALLS -> noEntityMsg
|
||||
NAME _ -> noEntityMsg
|
||||
-- other broker responses must have queue ID
|
||||
_
|
||||
| B.null entId -> Left $ CMD NO_ENTITY
|
||||
|
||||
@@ -108,6 +108,7 @@ import Simplex.Messaging.Server.Env.STM as Env
|
||||
import Simplex.Messaging.Server.Expiration
|
||||
import Simplex.Messaging.Server.MsgStore
|
||||
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore, JournalQueue (..), getJournalQueueMessages)
|
||||
import Simplex.Messaging.Server.Names (ResolveError (..), closeNamesEnv, resolveName)
|
||||
import Simplex.Messaging.Server.MsgStore.STM
|
||||
import Simplex.Messaging.Server.MsgStore.Types
|
||||
import Simplex.Messaging.Server.NtfStore
|
||||
@@ -245,7 +246,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
saveServerStats
|
||||
|
||||
closeServer :: M s ()
|
||||
closeServer = asks (smpAgent . proxyAgent) >>= liftIO . closeSMPClientAgent
|
||||
closeServer = do
|
||||
asks (smpAgent . proxyAgent) >>= liftIO . closeSMPClientAgent
|
||||
asks namesEnv >>= liftIO . mapM_ closeNamesEnv
|
||||
|
||||
serverThread ::
|
||||
forall sub. String ->
|
||||
@@ -513,7 +516,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
|
||||
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, rcvServices, ntfServices}
|
||||
ss@ServerStats {fromTime, qCreated, qSecured, qDeletedAll, qDeletedAllB, qDeletedNew, qDeletedSecured, qSub, qSubAllB, qSubAuth, qSubDuplicate, qSubProhibited, qSubEnd, qSubEndB, ntfCreated, ntfDeleted, ntfDeletedB, ntfSub, ntfSubB, ntfSubAuth, ntfSubDuplicate, msgSent, msgSentAuth, msgSentQuota, msgSentLarge, msgRecv, msgRecvGet, msgGet, msgGetNoMsg, msgGetAuth, msgGetDuplicate, msgGetProhibited, msgExpired, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount, ntfCount, pRelays, pRelaysOwn, pMsgFwds, pMsgFwdsOwn, pMsgFwdsRecv, rcvServices, ntfServices, rslvStats}
|
||||
<- asks serverStats
|
||||
st <- asks msgStore
|
||||
EntityCounts {queueCount, notifierCount, rcvServiceCount, ntfServiceCount, rcvServiceQueuesCount, ntfServiceQueuesCount} <-
|
||||
@@ -576,6 +579,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
qCount' <- readIORef qCount
|
||||
msgCount' <- readIORef msgCount
|
||||
ntfCount' <- readIORef ntfCount
|
||||
rslvStats' <- getResetNameResolverStatsData rslvStats
|
||||
T.hPutStrLn h $
|
||||
T.intercalate
|
||||
","
|
||||
@@ -649,6 +653,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
]
|
||||
<> showServiceStats rcvServices'
|
||||
<> showServiceStats ntfServices'
|
||||
<> showNameResolverStats rslvStats'
|
||||
)
|
||||
liftIO $ threadDelay' interval
|
||||
where
|
||||
@@ -656,6 +661,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
|
||||
map tshow [_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther]
|
||||
showServiceStats ServiceStatsData {_srvAssocNew, _srvAssocDuplicate, _srvAssocUpdated, _srvAssocRemoved, _srvSubCount, _srvSubDuplicate, _srvSubQueues, _srvSubEnd} =
|
||||
map tshow [_srvAssocNew, _srvAssocDuplicate, _srvAssocUpdated, _srvAssocRemoved, _srvSubCount, _srvSubDuplicate, _srvSubQueues, _srvSubEnd]
|
||||
showNameResolverStats NameResolverStatsData {_rslvReqs, _rslvSucc, _rslvNotFound, _rslvEthErrs, _rslvCacheHits, _rslvCacheMiss, _rslvDisabled} =
|
||||
map tshow [_rslvReqs, _rslvSucc, _rslvNotFound, _rslvEthErrs, _rslvCacheHits, _rslvCacheMiss, _rslvDisabled]
|
||||
|
||||
prometheusMetricsThread_ :: ServerConfig s -> [M s ()]
|
||||
prometheusMetricsThread_ ServerConfig {prometheusInterval = Just interval, prometheusMetricsFile} =
|
||||
@@ -1149,8 +1156,8 @@ receive h@THandle {params = THandleParams {thAuth, sessionId}} ms Client {rcvQ,
|
||||
updateBatchStats stats cmd -- even if nothing is verified
|
||||
let queueId (_, _, (_, qId, _)) = qId
|
||||
qs <- getQueueRecs ms p $ map queueId ts'
|
||||
zipWithM (\t -> verified stats t . verifyLoadedQueue service thAuth t) ts' qs
|
||||
_ -> mapM (\t -> verified stats t =<< verifyTransmission ms service thAuth t) ts'
|
||||
zipWithM (\t -> verified stats t . verifyLoadedQueue False service thAuth t) ts' qs
|
||||
_ -> mapM (\t -> verified stats t =<< verifyTransmission False ms service thAuth t) ts'
|
||||
mapM_ (atomically . writeTBQueue rcvQ) $ L.nonEmpty cmds
|
||||
pure $ errs ++ errs'
|
||||
[] -> pure errs
|
||||
@@ -1230,19 +1237,19 @@ data VerificationResult s = VRVerified (Maybe (StoreQueue s, QueueRec)) | VRFail
|
||||
-- - the queue or party key do not exist.
|
||||
-- In all cases, the time of the verification should depend only on the provided authorization type,
|
||||
-- a dummy key is used to run verification in the last two cases, and failure is returned irrespective of the result.
|
||||
verifyTransmission :: forall s. MsgStoreClass s => s -> Maybe THPeerClientService -> Maybe (THandleAuth 'TServer) -> SignedTransmission Cmd -> IO (VerificationResult s)
|
||||
verifyTransmission ms service thAuth t@(_, _, (_, queueId, Cmd p _)) = case queueParty p of
|
||||
Just Dict -> verifyLoadedQueue service thAuth t <$> getQueueRec ms p queueId
|
||||
Nothing -> pure $ verifyQueueTransmission service thAuth t Nothing
|
||||
verifyTransmission :: forall s. MsgStoreClass s => Bool -> s -> Maybe THPeerClientService -> Maybe (THandleAuth 'TServer) -> SignedTransmission Cmd -> IO (VerificationResult s)
|
||||
verifyTransmission forwarded ms service thAuth t@(_, _, (_, queueId, Cmd p _)) = case queueParty p of
|
||||
Just Dict -> verifyLoadedQueue forwarded service thAuth t <$> getQueueRec ms p queueId
|
||||
Nothing -> pure $ verifyQueueTransmission forwarded service thAuth t Nothing
|
||||
|
||||
verifyLoadedQueue :: Maybe THPeerClientService -> Maybe (THandleAuth 'TServer) -> SignedTransmission Cmd -> Either ErrorType (StoreQueue s, QueueRec) -> VerificationResult s
|
||||
verifyLoadedQueue service thAuth t@(tAuth, authorized, (corrId, _, _)) = \case
|
||||
Right q -> verifyQueueTransmission service thAuth t (Just q)
|
||||
verifyLoadedQueue :: Bool -> Maybe THPeerClientService -> Maybe (THandleAuth 'TServer) -> SignedTransmission Cmd -> Either ErrorType (StoreQueue s, QueueRec) -> VerificationResult s
|
||||
verifyLoadedQueue forwarded service thAuth t@(tAuth, authorized, (corrId, _, _)) = \case
|
||||
Right q -> verifyQueueTransmission forwarded service thAuth t (Just q)
|
||||
Left AUTH -> dummyVerifyCmd thAuth tAuth authorized corrId `seq` VRFailed AUTH
|
||||
Left e -> VRFailed e
|
||||
|
||||
verifyQueueTransmission :: forall s. Maybe THPeerClientService -> Maybe (THandleAuth 'TServer) -> SignedTransmission Cmd -> Maybe (StoreQueue s, QueueRec) -> VerificationResult s
|
||||
verifyQueueTransmission service thAuth (tAuth, authorized, (corrId, entId, command@(Cmd p cmd))) q_
|
||||
verifyQueueTransmission :: forall s. Bool -> Maybe THPeerClientService -> Maybe (THandleAuth 'TServer) -> SignedTransmission Cmd -> Maybe (StoreQueue s, QueueRec) -> VerificationResult s
|
||||
verifyQueueTransmission forwarded service thAuth (tAuth, authorized, (corrId, entId, command@(Cmd p cmd))) q_
|
||||
| not checkRole = VRFailed $ CMD PROHIBITED
|
||||
| not verifyServiceSig = VRFailed SERVICE
|
||||
| otherwise = vc p cmd
|
||||
@@ -1262,6 +1269,9 @@ verifyQueueTransmission service thAuth (tAuth, authorized, (corrId, entId, comma
|
||||
vc SNotifierService NSUBS {} = verifyServiceCmd
|
||||
vc SProxiedClient _ = VRVerified Nothing
|
||||
vc SProxyService (RFWD _) = VRVerified Nothing
|
||||
vc SResolver (RSLV _)
|
||||
| forwarded = VRVerified Nothing
|
||||
| otherwise = VRFailed $ CMD PROHIBITED
|
||||
checkRole = case (service, partyClientRole p) of
|
||||
(Just THClientService {serviceRole}, Just role) -> serviceRole == role
|
||||
_ -> True
|
||||
@@ -1486,6 +1496,16 @@ client
|
||||
SEND flags msgBody -> response <$> withQueue_ False err (sendMessage flags msgBody)
|
||||
Cmd SIdleClient PING -> pure $ response (corrId, NoEntity, PONG)
|
||||
Cmd SProxyService (RFWD encBlock) -> response . (corrId,NoEntity,) <$> processForwardedCommand encBlock
|
||||
Cmd SResolver (RSLV (LookupKey key)) -> do
|
||||
st <- asks (rslvStats . serverStats)
|
||||
incStat (rslvReqs st)
|
||||
asks namesEnv >>= \case
|
||||
Nothing -> incStat (rslvDisabled st) $> response (corrId, NoEntity, ERR AUTH)
|
||||
Just nenv ->
|
||||
liftIO (resolveName nenv key) >>= \case
|
||||
Right rec -> incStat (rslvSucc st) $> response (corrId, NoEntity, NAME rec)
|
||||
Left NotFound -> incStat (rslvNotFound st) $> response (corrId, NoEntity, ERR AUTH)
|
||||
Left _ -> incStat (rslvEthErrs st) $> response (corrId, NoEntity, ERR AUTH)
|
||||
Cmd SSenderLink command -> case command of
|
||||
LKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k $>> getQueueLink_ q qr
|
||||
LGET -> withQueue $ \q qr -> checkContact qr $ getQueueLink_ q qr
|
||||
@@ -2126,7 +2146,7 @@ client
|
||||
rejectOrVerify clntThAuth = \case
|
||||
Left (corrId', entId', e) -> pure $ Left (corrId', entId', ERR e)
|
||||
Right t'@(_, _, t''@(corrId', entId', cmd'))
|
||||
| allowed -> liftIO $ verified <$> verifyTransmission ms Nothing clntThAuth t'
|
||||
| allowed -> liftIO $ verified <$> verifyTransmission True ms Nothing clntThAuth t'
|
||||
| otherwise -> pure $ Left (corrId', entId', ERR $ CMD PROHIBITED)
|
||||
where
|
||||
allowed = case cmd' of
|
||||
@@ -2134,6 +2154,7 @@ client
|
||||
Cmd SSender (SKEY _) -> True
|
||||
Cmd SSenderLink (LKEY _) -> True
|
||||
Cmd SSenderLink LGET -> True
|
||||
Cmd SResolver (RSLV _) -> True
|
||||
_ -> False
|
||||
verified = \case
|
||||
VRVerified q -> Right (q, t'')
|
||||
@@ -2217,10 +2238,6 @@ updateDeletedStats q = do
|
||||
incStat $ qDeletedAll stats
|
||||
liftIO $ atomicModifyIORef'_ (qCount stats) (subtract 1)
|
||||
|
||||
incStat :: MonadIO m => IORef Int -> m ()
|
||||
incStat r = liftIO $ atomicModifyIORef'_ r (+ 1)
|
||||
{-# INLINE incStat #-}
|
||||
|
||||
randomId' :: Int -> M s ByteString
|
||||
randomId' n = atomically . C.randomBytes n =<< asks random
|
||||
|
||||
|
||||
@@ -115,6 +115,7 @@ import Simplex.Messaging.Server.Information
|
||||
import Simplex.Messaging.Server.MsgStore.Journal
|
||||
import Simplex.Messaging.Server.MsgStore.STM
|
||||
import Simplex.Messaging.Server.MsgStore.Types
|
||||
import Simplex.Messaging.Server.Names (NamesConfig (..), NamesEnv, closeNamesEnv, newNamesEnv)
|
||||
import Simplex.Messaging.Server.NtfStore
|
||||
import Simplex.Messaging.Server.QueueStore
|
||||
import Simplex.Messaging.Server.QueueStore.Postgres.Config
|
||||
@@ -197,6 +198,8 @@ data ServerConfig s = ServerConfig
|
||||
smpAgentCfg :: SMPClientAgentConfig,
|
||||
allowSMPProxy :: Bool, -- auth is the same with `newQueueBasicAuth`
|
||||
serverClientConcurrency :: Int,
|
||||
-- | public-namespace resolver config; Nothing disables the names role
|
||||
namesConfig :: Maybe NamesConfig,
|
||||
-- | server public information
|
||||
information :: Maybe ServerPublicInfo,
|
||||
startOptions :: StartOptions
|
||||
@@ -272,7 +275,8 @@ data Env s = Env
|
||||
serverStats :: ServerStats,
|
||||
sockets :: TVar [(ServiceName, SocketState)],
|
||||
clientSeq :: TVar ClientId,
|
||||
proxyAgent :: ProxyAgent -- senders served on this proxy
|
||||
proxyAgent :: ProxyAgent, -- senders served on this proxy
|
||||
namesEnv :: Maybe NamesEnv -- public-namespace resolver, present when [NAMES] enable: on
|
||||
}
|
||||
|
||||
msgStore :: Env s -> s
|
||||
@@ -558,7 +562,7 @@ newProhibitedSub = do
|
||||
return Sub {subThread = ProhibitSub, delivered}
|
||||
|
||||
newEnv :: ServerConfig s -> IO (Env s)
|
||||
newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smpAgentCfg, information, messageExpiration, idleQueueInterval, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines} = do
|
||||
newEnv config@ServerConfig {allowSMPProxy, smpCredentials, httpCredentials, serverStoreCfg, smpAgentCfg, information, messageExpiration, idleQueueInterval, msgQueueQuota, maxJournalMsgCount, maxJournalStateLines, namesConfig} = do
|
||||
serverActive <- newTVarIO True
|
||||
server <- newServer
|
||||
msgStore_ <- case serverStoreCfg of
|
||||
@@ -603,6 +607,15 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp
|
||||
sockets <- newTVarIO []
|
||||
clientSeq <- newTVarIO 0
|
||||
proxyAgent <- newSMPProxyAgent smpAgentCfg random
|
||||
namesEnv <- case namesConfig of
|
||||
Nothing -> pure Nothing
|
||||
Just nc
|
||||
| allowSMPProxy && not (dangerousColocation nc) -> do
|
||||
logError "[NAMES] enable: on with [PROXY] is refused — RSLV cache misses can serialise other forwarded commands. Set allow_dangerous_colocation = on to override."
|
||||
exitFailure
|
||||
| otherwise -> do
|
||||
let rs = rslvStats serverStats
|
||||
Just <$> newNamesEnv nc (rslvCacheHits rs) (rslvCacheMiss rs)
|
||||
pure
|
||||
Env
|
||||
{ serverActive,
|
||||
@@ -618,7 +631,8 @@ newEnv config@ServerConfig {smpCredentials, httpCredentials, serverStoreCfg, smp
|
||||
serverStats,
|
||||
sockets,
|
||||
clientSeq,
|
||||
proxyAgent
|
||||
proxyAgent,
|
||||
namesEnv
|
||||
}
|
||||
where
|
||||
loadStoreLog :: StoreQueueClass q => (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO ()
|
||||
|
||||
@@ -39,6 +39,7 @@ module Simplex.Messaging.Server.Main
|
||||
strParse,
|
||||
) where
|
||||
|
||||
import Control.Applicative ((<|>))
|
||||
import Control.Concurrent.STM
|
||||
import Control.Exception (finally)
|
||||
import Control.Logger.Simple
|
||||
@@ -76,6 +77,8 @@ import Simplex.Messaging.Server.Main.Init
|
||||
import Simplex.Messaging.Server.Web (EmbeddedWebParams (..), WebHttpsParams (..))
|
||||
import Simplex.Messaging.Server.MsgStore.Journal (JournalMsgStore (..), QStoreCfg (..), stmQueueStore)
|
||||
import Simplex.Messaging.Server.MsgStore.Types (MsgStoreClass (..), SQSType (..), SMSType (..), newMsgStore)
|
||||
import Simplex.Messaging.Protocol (mkNameOwner, NameOwner)
|
||||
import Simplex.Messaging.Server.Names (NamesConfig (..), RpcAuth (..))
|
||||
import Simplex.Messaging.Server.QueueStore.Postgres.Config
|
||||
import Simplex.Messaging.Server.StoreLog.ReadWrite (readQueueStore)
|
||||
import Simplex.Messaging.Transport (supportedProxyClientSMPRelayVRange, alpnSupportedSMPHandshakes, supportedServerSMPRelayVRange)
|
||||
@@ -605,6 +608,7 @@ smpServerCLI_ generateSite serveStaticFiles attachStaticFiles cfgPath logPath =
|
||||
},
|
||||
allowSMPProxy = True,
|
||||
serverClientConcurrency = readIniDefault defaultProxyClientConcurrency "PROXY" "client_concurrency" ini,
|
||||
namesConfig = readNamesConfig ini,
|
||||
information = serverPublicInfo ini,
|
||||
startOptions
|
||||
}
|
||||
@@ -796,6 +800,64 @@ validCountryValue field s
|
||||
| length s == 2 && all (\c -> isAscii c && isAlpha c) s = Right $ T.pack $ map toUpper s
|
||||
| otherwise = Left $ "Use ISO3166 2-letter code for " <> field
|
||||
|
||||
readNamesConfig :: Ini -> Maybe NamesConfig
|
||||
readNamesConfig ini
|
||||
| not enabled = Nothing
|
||||
| otherwise =
|
||||
Just
|
||||
NamesConfig
|
||||
{ ethereumEndpoint = requiredText "ethereum_endpoint",
|
||||
snrcAddress = either (error . ("[NAMES] snrc_address: " <>)) id $ parseEthAddr (requiredText "snrc_address"),
|
||||
rpcAuth = either (error . ("[NAMES] rpc_auth: " <>)) Just . parseRpcAuth =<< eitherToMaybe (lookupValue "NAMES" "rpc_auth" ini),
|
||||
cacheSeconds = readIniDefault 300 "NAMES" "cache_seconds" ini,
|
||||
cacheMaxEntries = readIniDefault 100000 "NAMES" "cache_max_entries" ini,
|
||||
cacheMaxBytes = readIniDefault 67108864 "NAMES" "cache_max_bytes" ini,
|
||||
rpcTimeoutMs = readIniDefault 3000 "NAMES" "rpc_timeout_ms" ini,
|
||||
rpcMaxResponseBytes = readIniDefault 262144 "NAMES" "rpc_max_response_bytes" ini,
|
||||
rpcMaxConcurrency = readIniDefault 8 "NAMES" "rpc_max_concurrency" ini,
|
||||
dangerousColocation = fromMaybe False (iniOnOff "NAMES" "allow_dangerous_colocation" ini)
|
||||
}
|
||||
where
|
||||
enabled = fromMaybe False (iniOnOff "NAMES" "enable" ini)
|
||||
requiredText key =
|
||||
either (error . (("[NAMES] " <> T.unpack key <> " is required: ") <>)) id $
|
||||
lookupValue "NAMES" key ini
|
||||
|
||||
-- | Parse a 20-byte Ethereum address as text "0x[hex40]" or "[hex40]".
|
||||
-- Step 4 minimal validation; EIP-55 checksum check lands in step 5.
|
||||
parseEthAddr :: Text -> Either String NameOwner
|
||||
parseEthAddr t =
|
||||
let s = case T.stripPrefix "0x" t <|> T.stripPrefix "0X" t of
|
||||
Just rest -> rest
|
||||
Nothing -> t
|
||||
in if T.length s == 40 && T.all isHex s
|
||||
then mkNameOwner (hexDecode (encodeUtf8 s))
|
||||
else Left "expected 0x-prefixed 40 hex characters"
|
||||
where
|
||||
isHex c = (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F')
|
||||
|
||||
-- | Decode a hex string of even length. Precondition: input is already
|
||||
-- validated as even-length and all-hex (validated by caller).
|
||||
hexDecode :: ByteString -> ByteString
|
||||
hexDecode = B.pack . go
|
||||
where
|
||||
go s
|
||||
| B.null s = []
|
||||
| otherwise = toEnum (16 * digit (B.head s) + digit (B.index s 1)) : go (B.drop 2 s)
|
||||
digit c
|
||||
| c >= '0' && c <= '9' = fromEnum c - fromEnum '0'
|
||||
| c >= 'a' && c <= 'f' = 10 + fromEnum c - fromEnum 'a'
|
||||
| otherwise = 10 + fromEnum c - fromEnum 'A'
|
||||
|
||||
parseRpcAuth :: Text -> Either String RpcAuth
|
||||
parseRpcAuth t = case T.words t of
|
||||
["bearer", tok] -> Right $ AuthBearer tok
|
||||
["basic", up] -> case T.breakOn ":" up of
|
||||
(u, rest)
|
||||
| not (T.null u) && ":" `T.isPrefixOf` rest -> Right $ AuthBasic u (T.drop 1 rest)
|
||||
_ -> Left "basic auth expects user:password"
|
||||
_ -> Left "expected `bearer <token>` or `basic <user>:<pass>`"
|
||||
|
||||
printSourceCode :: Maybe Text -> IO ()
|
||||
printSourceCode = \case
|
||||
Just sourceCode -> T.putStrLn $ "Server source code: " <> sourceCode
|
||||
|
||||
@@ -155,6 +155,25 @@ iniFileContent cfgPath logPath opts host basicAuth controlPortPwds =
|
||||
\# Limit number of threads a client can spawn to process proxy commands in parrallel.\n"
|
||||
<> ("# client_concurrency = " <> tshow defaultProxyClientConcurrency)
|
||||
<> "\n\n\
|
||||
\[NAMES]\n\
|
||||
\# Public-namespace resolution (SNRC on Ethereum).\n\
|
||||
\# Requires an Ethereum JSON-RPC endpoint (Reth+Nimbus). See deployment guide.\n\
|
||||
\# Cannot be combined with [PROXY] enable: on by default - see allow_dangerous_colocation.\n\
|
||||
\# Restart required to change settings.\n\
|
||||
\enable: off\n\
|
||||
\# Same-host:\n\
|
||||
\# ethereum_endpoint: http://127.0.0.1:8545\n\
|
||||
\# Central Reth via Caddy:\n\
|
||||
\# ethereum_endpoint: https://eth.simplex.chat:443\n\
|
||||
\# rpc_auth: basic <username>:<password>\n\
|
||||
\# snrc_address: 0x0000000000000000000000000000000000000000\n\
|
||||
\# cache_seconds: 300\n\
|
||||
\# cache_max_entries: 100000\n\
|
||||
\# cache_max_bytes: 67108864\n\
|
||||
\# rpc_timeout_ms: 3000\n\
|
||||
\# rpc_max_response_bytes: 262144\n\
|
||||
\# rpc_max_concurrency: 8\n\
|
||||
\# allow_dangerous_colocation: off\n\n\
|
||||
\[INACTIVE_CLIENTS]\n\
|
||||
\# TTL and interval to check inactive clients\n\
|
||||
\disconnect = on\n"
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
-- | SMP public-namespace resolver façade.
|
||||
--
|
||||
-- Re-exports the resolver's public surface from Names.Resolver and the
|
||||
-- HTTP auth type from Names.Eth.RPC. Implementation lives in Resolver.hs;
|
||||
-- Eth.RPC / Eth.SNRC are transport / codec internals.
|
||||
module Simplex.Messaging.Server.Names
|
||||
( NamesConfig (..),
|
||||
RpcAuth (..),
|
||||
NamesEnv,
|
||||
ResolveError (..),
|
||||
newNamesEnv,
|
||||
closeNamesEnv,
|
||||
resolveName,
|
||||
)
|
||||
where
|
||||
|
||||
import Simplex.Messaging.Server.Names.Eth.RPC (RpcAuth (..))
|
||||
import Simplex.Messaging.Server.Names.Resolver (NamesConfig (..), NamesEnv, ResolveError (..), closeNamesEnv, newNamesEnv, resolveName)
|
||||
@@ -0,0 +1,213 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE StrictData #-}
|
||||
|
||||
-- | Ethereum JSON-RPC HTTP transport for the resolver.
|
||||
--
|
||||
-- Boundary properties:
|
||||
-- * Response body read with `brReadSome rpcMaxResponseBytes` — adversarial
|
||||
-- endpoints cannot exhaust memory with multi-GB bodies.
|
||||
-- * Concurrency cap via QSem — bursts of cache-miss traffic cannot exhaust
|
||||
-- the http-client connection pool.
|
||||
-- * Authorization header attached only when configured.
|
||||
module Simplex.Messaging.Server.Names.Eth.RPC
|
||||
( RpcAuth (..),
|
||||
EthRpcEnv (..),
|
||||
EthRpcError (..),
|
||||
newEthRpcEnv,
|
||||
closeEthRpcEnv,
|
||||
ethCallReal,
|
||||
scrubUrl,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Applicative ((<|>))
|
||||
import Control.Concurrent.QSem (QSem, newQSem, signalQSem, waitQSem)
|
||||
import qualified Control.Exception as E
|
||||
import Control.Exception (bracket_)
|
||||
import qualified Data.Aeson as J
|
||||
import qualified Data.Aeson.Types as J
|
||||
import qualified Data.ByteArray.Encoding as BAE
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import qualified Data.ByteString.Lazy as BL
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (encodeUtf8)
|
||||
import Network.HTTP.Client
|
||||
( HttpException,
|
||||
Manager,
|
||||
Request,
|
||||
RequestBody (..),
|
||||
brReadSome,
|
||||
closeManager,
|
||||
method,
|
||||
parseRequest,
|
||||
requestBody,
|
||||
requestHeaders,
|
||||
responseBody,
|
||||
responseStatus,
|
||||
withResponse,
|
||||
)
|
||||
import qualified Network.HTTP.Client as HC
|
||||
import Network.HTTP.Client.TLS (tlsManagerSettings)
|
||||
import qualified Network.HTTP.Types as HT
|
||||
|
||||
data RpcAuth = AuthBearer Text | AuthBasic Text Text
|
||||
deriving (Show)
|
||||
|
||||
data EthRpcEnv = EthRpcEnv
|
||||
{ manager :: Manager,
|
||||
request :: Request,
|
||||
sem :: QSem,
|
||||
maxResponseBytes :: Int
|
||||
}
|
||||
|
||||
data EthRpcError
|
||||
= HttpFailure HttpException
|
||||
| HttpStatusErr Int
|
||||
| BodyTooLarge
|
||||
| InvalidJson String
|
||||
| JsonRpcErr Int Text
|
||||
deriving (Show)
|
||||
|
||||
-- | Build a Request from a (validated) ethereum_endpoint URL.
|
||||
buildRequest :: Text -> Maybe RpcAuth -> IO Request
|
||||
buildRequest endpoint auth_ = do
|
||||
req <- parseRequest (T.unpack endpoint)
|
||||
pure $
|
||||
req
|
||||
{ method = "POST",
|
||||
requestHeaders =
|
||||
("Content-Type", "application/json")
|
||||
: maybe [] (pure . authHeader) auth_
|
||||
}
|
||||
|
||||
authHeader :: RpcAuth -> HT.Header
|
||||
authHeader = \case
|
||||
AuthBearer tok -> ("Authorization", "Bearer " <> encodeUtf8 tok)
|
||||
AuthBasic u p ->
|
||||
let encoded = BAE.convertToBase BAE.Base64 (encodeUtf8 u <> ":" <> encodeUtf8 p) :: ByteString
|
||||
in ("Authorization", "Basic " <> encoded)
|
||||
|
||||
newEthRpcEnv :: Text -> Maybe RpcAuth -> Int -> Int -> IO EthRpcEnv
|
||||
newEthRpcEnv endpoint auth_ maxResponseBytes maxConcurrency = do
|
||||
manager <- HC.newManager tlsManagerSettings
|
||||
request <- buildRequest endpoint auth_
|
||||
sem <- newQSem maxConcurrency
|
||||
pure EthRpcEnv {manager, request, sem, maxResponseBytes}
|
||||
|
||||
closeEthRpcEnv :: EthRpcEnv -> IO ()
|
||||
closeEthRpcEnv EthRpcEnv {manager} = closeManager manager
|
||||
|
||||
-- | Make a single eth_call. `to` is the contract address (20 raw bytes);
|
||||
-- `dat` is the ABI-encoded call data. Returns the contract return bytes.
|
||||
ethCallReal :: EthRpcEnv -> ByteString -> ByteString -> IO (Either EthRpcError ByteString)
|
||||
ethCallReal EthRpcEnv {manager, request, sem, maxResponseBytes} to dat =
|
||||
bracket_ (waitQSem sem) (signalQSem sem) $ do
|
||||
let body = J.encode (rpcEnvelope to dat)
|
||||
req = request {requestBody = RequestBodyLBS body}
|
||||
result <- E.try $ withResponse req manager $ \res -> do
|
||||
let status = responseStatus res
|
||||
if HT.statusCode status >= 400
|
||||
then pure (Left (HttpStatusErr (HT.statusCode status)))
|
||||
else do
|
||||
bs <- brReadSome (responseBody res) (maxResponseBytes + 1)
|
||||
if BL.length bs > fromIntegral maxResponseBytes
|
||||
then pure (Left BodyTooLarge)
|
||||
else pure (parseResult (BL.toStrict bs))
|
||||
pure (either (Left . HttpFailure) id result)
|
||||
|
||||
rpcEnvelope :: ByteString -> ByteString -> J.Value
|
||||
rpcEnvelope to dat =
|
||||
J.object
|
||||
[ "jsonrpc" J..= ("2.0" :: Text),
|
||||
"id" J..= (1 :: Int),
|
||||
"method" J..= ("eth_call" :: Text),
|
||||
"params"
|
||||
J..= [ J.object
|
||||
[ "to" J..= toHex to,
|
||||
"data" J..= toHex dat
|
||||
],
|
||||
J.String "latest"
|
||||
]
|
||||
]
|
||||
|
||||
parseResult :: ByteString -> Either EthRpcError ByteString
|
||||
parseResult bs = case J.eitherDecodeStrict bs of
|
||||
Left e -> Left (InvalidJson e)
|
||||
Right (v :: J.Value) -> case J.parseEither parser v of
|
||||
Left e -> Left (InvalidJson e)
|
||||
Right r -> r
|
||||
where
|
||||
parser :: J.Value -> J.Parser (Either EthRpcError ByteString)
|
||||
parser = J.withObject "rpc" $ \o -> do
|
||||
mErr :: Maybe J.Value <- o J..:? "error"
|
||||
case mErr of
|
||||
Just (J.Object eo) -> do
|
||||
code <- (eo J..: "code") <|> pure (-1 :: Int)
|
||||
msg <- (eo J..: "message") <|> pure ("rpc error" :: Text)
|
||||
pure (Left (JsonRpcErr code msg))
|
||||
_ -> do
|
||||
result :: Text <- o J..: "result"
|
||||
case fromHex (encodeUtf8 result) of
|
||||
Right b -> pure (Right b)
|
||||
Left e -> pure (Left (InvalidJson e))
|
||||
|
||||
toHex :: ByteString -> Text
|
||||
toHex bs = T.pack $ "0x" <> concatMap byte (B.unpack bs)
|
||||
where
|
||||
byte c =
|
||||
let n = fromEnum c
|
||||
(h, l) = quotRem n 16
|
||||
in [hexChar h, hexChar l]
|
||||
hexChar n
|
||||
| n < 10 = toEnum (fromEnum '0' + n)
|
||||
| otherwise = toEnum (fromEnum 'a' + n - 10)
|
||||
|
||||
fromHex :: ByteString -> Either String ByteString
|
||||
fromHex bs0 =
|
||||
let bs = case B.stripPrefix "0x" bs0 of
|
||||
Just rest -> rest
|
||||
Nothing -> case B.stripPrefix "0X" bs0 of
|
||||
Just rest -> rest
|
||||
Nothing -> bs0
|
||||
in if B.null bs
|
||||
then Right B.empty
|
||||
else
|
||||
if odd (B.length bs) || not (B.all isHex bs)
|
||||
then Left "invalid hex"
|
||||
else Right (decodeHex bs)
|
||||
where
|
||||
isHex c = (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F')
|
||||
|
||||
decodeHex :: ByteString -> ByteString
|
||||
decodeHex = B.pack . go
|
||||
where
|
||||
go s
|
||||
| B.null s = []
|
||||
| otherwise =
|
||||
let hi = digit (B.head s)
|
||||
lo = digit (B.index s 1)
|
||||
in toEnum (16 * hi + lo) : go (B.drop 2 s)
|
||||
digit c
|
||||
| c >= '0' && c <= '9' = fromEnum c - fromEnum '0'
|
||||
| c >= 'a' && c <= 'f' = 10 + fromEnum c - fromEnum 'a'
|
||||
| otherwise = 10 + fromEnum c - fromEnum 'A'
|
||||
|
||||
-- | Strip userinfo from a URL so log lines never leak credentials.
|
||||
scrubUrl :: Text -> Text
|
||||
scrubUrl url =
|
||||
let (scheme, rest) = T.breakOn "://" url
|
||||
in if T.null rest
|
||||
then url
|
||||
else
|
||||
let body = T.drop 3 rest
|
||||
(host, query) = T.breakOn "/" body
|
||||
in case T.breakOn "@" host of
|
||||
(_userinfo, atRest)
|
||||
| not (T.null atRest) -> scheme <> "://" <> T.drop 1 atRest <> query
|
||||
_ -> url
|
||||
@@ -0,0 +1,171 @@
|
||||
{-# LANGUAGE BangPatterns #-}
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE StrictData #-}
|
||||
|
||||
-- | SNRC contract codec: Keccak-256 namehash + bounded Solidity ABI decoder.
|
||||
--
|
||||
-- IMPORTANT: Ethereum uses Keccak-256, NOT NIST SHA3-256.
|
||||
--
|
||||
-- ABI safety invariants (enforced before any allocation):
|
||||
-- 1. offset + 32 <= buf.length (head read in-bounds)
|
||||
-- 2. offset + 32 + length <= buf.length (body in-bounds)
|
||||
-- 3. offset >= headEnd (no backward jumps)
|
||||
-- 4. every length <= per-field cap (bounded allocations)
|
||||
-- 5. string[] outer count * 32 + offset <= buf.length (array head fits)
|
||||
-- 6. recursion depth <= 2 (no deep nesting)
|
||||
-- 7. uint256 -> Int64 fails if any high 24 bytes non-zero (range check)
|
||||
-- 8. UTF-8 via decodeUtf8' returns AbiBadUtf8 (no partial bytes)
|
||||
module Simplex.Messaging.Server.Names.Eth.SNRC
|
||||
( -- * Namehash
|
||||
keccak256,
|
||||
namehash,
|
||||
|
||||
-- * SNRC eth_call payload
|
||||
snrcSelector,
|
||||
encodeGetRecord,
|
||||
|
||||
-- * ABI decoding
|
||||
AbiError (..),
|
||||
decodeGetRecord,
|
||||
decodeWord256Int64,
|
||||
decodeAddress,
|
||||
decodeString,
|
||||
decodeStringArray,
|
||||
)
|
||||
where
|
||||
|
||||
import Crypto.Hash (Digest, Keccak_256, hash)
|
||||
import qualified Data.ByteArray as BA
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Int (Int64)
|
||||
import Simplex.Messaging.Protocol (NameOwner, NameRecord, mkNameOwner, unNameOwner)
|
||||
|
||||
-- | ABI-decode failure modes (caller collapses to ResolveError EthDecodeErr).
|
||||
data AbiError
|
||||
= AbiTruncated
|
||||
| AbiOversized
|
||||
| AbiBackwardOffset
|
||||
| AbiNonZeroHighBytes
|
||||
| AbiBadUtf8
|
||||
| AbiDepthExceeded
|
||||
| AbiInvariantViolated String
|
||||
deriving (Eq, Show)
|
||||
|
||||
-- | Keccak-256 (Ethereum variant), NOT SHA3-256.
|
||||
keccak256 :: ByteString -> ByteString
|
||||
keccak256 = BA.convert . (hash :: ByteString -> Digest Keccak_256)
|
||||
{-# INLINE keccak256 #-}
|
||||
|
||||
-- | ENS / SNRC namehash: recursive keccak256 over reversed labels.
|
||||
-- Empty name -> 32 zero bytes; "a.b.c" -> keccak(keccak(keccak(0 ++ keccak "c") ++ keccak "b") ++ keccak "a").
|
||||
namehash :: ByteString -> ByteString
|
||||
namehash name
|
||||
| B.null name = zeroNode
|
||||
| otherwise = foldr step zeroNode (B.split '.' name)
|
||||
where
|
||||
zeroNode = B.replicate 32 '\NUL'
|
||||
step label acc = keccak256 (acc <> keccak256 label)
|
||||
|
||||
-- | First 4 bytes of keccak("getRecord(bytes32)"). Confirm signature
|
||||
-- against the Part 1 SNRC contract before merging.
|
||||
snrcSelector :: ByteString
|
||||
snrcSelector = B.take 4 (keccak256 "getRecord(bytes32)")
|
||||
|
||||
-- | Build the eth_call `data` parameter for getRecord(lookupKey).
|
||||
encodeGetRecord :: ByteString -> ByteString
|
||||
encodeGetRecord node32
|
||||
| B.length node32 == 32 = snrcSelector <> node32
|
||||
| otherwise = snrcSelector <> padLeft32 node32
|
||||
|
||||
padLeft32 :: ByteString -> ByteString
|
||||
padLeft32 bs
|
||||
| n >= 32 = B.take 32 bs
|
||||
| otherwise = B.replicate (32 - n) '\NUL' <> bs
|
||||
where
|
||||
n = B.length bs
|
||||
|
||||
-- | Read a uint256 at byte offset, fail if it doesn't fit in Int64.
|
||||
decodeWord256Int64 :: Int -> ByteString -> Either AbiError Int64
|
||||
decodeWord256Int64 off buf
|
||||
| off + 32 > B.length buf = Left AbiTruncated
|
||||
| B.any (/= toEnum 0) (B.take 24 (B.drop off buf)) = Left AbiNonZeroHighBytes
|
||||
| otherwise = Right $ B.foldl shiftIn 0 (B.take 8 (B.drop (off + 24) buf))
|
||||
where
|
||||
shiftIn :: Int64 -> Char -> Int64
|
||||
shiftIn !acc c = (acc * 256) + fromIntegral (fromEnum c :: Int)
|
||||
{-# INLINE decodeWord256Int64 #-}
|
||||
|
||||
-- | Read an Ethereum address at byte offset (uint256 with high 12 bytes zero).
|
||||
decodeAddress :: Int -> ByteString -> Either AbiError NameOwner
|
||||
decodeAddress off buf
|
||||
| off + 32 > B.length buf = Left AbiTruncated
|
||||
| B.any (/= toEnum 0) (B.take 12 (B.drop off buf)) = Left (AbiInvariantViolated "address has non-zero high 12 bytes")
|
||||
| otherwise = case mkNameOwner (B.take 20 (B.drop (off + 12) buf)) of
|
||||
Right addr -> Right addr
|
||||
Left e -> Left (AbiInvariantViolated e)
|
||||
|
||||
-- | Decode a Solidity `string` whose data starts at byte offset `off`.
|
||||
decodeString :: Int -> Int -> Int -> ByteString -> Either AbiError ByteString
|
||||
decodeString headEnd off cap buf
|
||||
| off < headEnd = Left AbiBackwardOffset
|
||||
| off + 32 > B.length buf = Left AbiTruncated
|
||||
| otherwise = do
|
||||
n <- decodeWord256Int64 off buf
|
||||
let len = fromIntegral n :: Int
|
||||
if len > cap
|
||||
then Left AbiOversized
|
||||
else
|
||||
if off + 32 + len > B.length buf
|
||||
then Left AbiTruncated
|
||||
else Right $ B.take len (B.drop (off + 32) buf)
|
||||
|
||||
-- | Decode a Solidity `string[]` at byte offset `off`. Each element capped
|
||||
-- at `byteCap` bytes, total element count capped at `cntCap`. Depth must be
|
||||
-- < 2 (recurses one level into decodeString).
|
||||
decodeStringArray :: Int -> Int -> Int -> Int -> Int -> ByteString -> Either AbiError [ByteString]
|
||||
decodeStringArray depth headEnd off cntCap byteCap buf
|
||||
| depth >= 2 = Left AbiDepthExceeded
|
||||
| off < headEnd = Left AbiBackwardOffset
|
||||
| off + 32 > B.length buf = Left AbiTruncated
|
||||
| otherwise = do
|
||||
n <- decodeWord256Int64 off buf
|
||||
let cnt = fromIntegral n :: Int
|
||||
if cnt > cntCap
|
||||
then Left AbiOversized
|
||||
else
|
||||
let arrHead = off + 32
|
||||
arrHeadEnd = arrHead + cnt * 32
|
||||
in if arrHeadEnd > B.length buf
|
||||
then Left AbiTruncated
|
||||
else collectN 0 cnt arrHead arrHeadEnd []
|
||||
where
|
||||
collectN i n base hd acc
|
||||
| i >= n = Right (reverse acc)
|
||||
| otherwise = do
|
||||
relOff <- decodeWord256Int64 (base + i * 32) buf
|
||||
let absOff = base + fromIntegral relOff
|
||||
s <- decodeString hd absOff byteCap buf
|
||||
collectN (i + 1) n base hd (s : acc)
|
||||
|
||||
-- | Decode the ABI-encoded return value of getRecord(bytes32) into a NameRecord.
|
||||
-- Zero-owner (0x000...000) is reported as Right Nothing so the caller maps it
|
||||
-- to NotFound (ENS-style sentinel).
|
||||
--
|
||||
-- PLACEHOLDER: returns Right Nothing for any non-zero owner until the Part 1
|
||||
-- SNRC contract ABI is finalised. All ABI primitives above are production-ready;
|
||||
-- only the field-layout-aware composition is pending.
|
||||
decodeGetRecord :: ByteString -> Either AbiError (Maybe NameRecord)
|
||||
decodeGetRecord buf
|
||||
| B.length buf < 32 * 8 = Left AbiTruncated
|
||||
| otherwise = case decodeAddress 32 buf of
|
||||
Left e -> Left e
|
||||
Right owner
|
||||
| isZeroOwner owner -> Right Nothing
|
||||
| otherwise -> Right Nothing -- placeholder until SNRC ABI is finalised
|
||||
|
||||
isZeroOwner :: NameOwner -> Bool
|
||||
isZeroOwner = (== B.replicate 20 '\NUL') . unNameOwner
|
||||
@@ -0,0 +1,200 @@
|
||||
{-# LANGUAGE LambdaCase #-}
|
||||
{-# LANGUAGE NamedFieldPuns #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE RecordWildCards #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
{-# LANGUAGE StrictData #-}
|
||||
|
||||
-- | Public-namespace resolver: TTL+FIFO cache, in-flight coalescing,
|
||||
-- timeout-bounded RPC, and zero-owner → NotFound mapping.
|
||||
module Simplex.Messaging.Server.Names.Resolver
|
||||
( NamesConfig (..),
|
||||
RpcAuth (..),
|
||||
NamesEnv (..),
|
||||
EthCall,
|
||||
ResolveError (..),
|
||||
newNamesEnv,
|
||||
newNamesEnvWith,
|
||||
closeNamesEnv,
|
||||
resolveName,
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.STM
|
||||
import qualified Control.Exception as E
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.HashPSQ as PSQ
|
||||
import Data.IORef (IORef)
|
||||
import Data.Text (Text)
|
||||
import Data.Word (Word64)
|
||||
import GHC.Clock (getMonotonicTimeNSec)
|
||||
import Simplex.Messaging.Protocol (NameOwner, NameRecord, unNameOwner)
|
||||
import Simplex.Messaging.Server.Names.Eth.RPC (EthRpcEnv, EthRpcError (..), RpcAuth (..), closeEthRpcEnv, ethCallReal, newEthRpcEnv)
|
||||
import Simplex.Messaging.Server.Names.Eth.SNRC (decodeGetRecord, encodeGetRecord, namehash)
|
||||
import Simplex.Messaging.Util (atomicModifyIORef'_)
|
||||
import System.Timeout (timeout)
|
||||
|
||||
-- | Public-namespace resolver configuration.
|
||||
data NamesConfig = NamesConfig
|
||||
{ ethereumEndpoint :: Text,
|
||||
snrcAddress :: NameOwner,
|
||||
rpcAuth :: Maybe RpcAuth,
|
||||
cacheSeconds :: Int,
|
||||
cacheMaxEntries :: Int,
|
||||
cacheMaxBytes :: Int,
|
||||
rpcTimeoutMs :: Int,
|
||||
rpcMaxResponseBytes :: Int,
|
||||
rpcMaxConcurrency :: Int,
|
||||
dangerousColocation :: Bool
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
data ResolveError
|
||||
= NotFound
|
||||
| EthHttpErr
|
||||
| EthRpcErr {rpcCode :: Int, rpcMessage :: Text}
|
||||
| EthDecodeErr
|
||||
| TimedOut
|
||||
deriving (Eq, Show)
|
||||
|
||||
-- | Test seam: a function from (to, data) -> raw return bytes or error.
|
||||
-- Production wires this to ethCallReal; tests substitute a stub.
|
||||
type EthCall = ByteString -> ByteString -> IO (Either EthRpcError ByteString)
|
||||
|
||||
-- | Cache value bundles a NameRecord with its insertion-time byte cost
|
||||
-- so eviction can keep total cache bytes under cacheMaxBytes.
|
||||
data CacheEntry = CacheEntry
|
||||
{ ceRecord :: NameRecord,
|
||||
ceBytes :: Int
|
||||
}
|
||||
|
||||
-- | Cache state: (PSQ keyed by LookupKey, priority = insert time in ns, total bytes).
|
||||
-- PSQ minView returns lowest-priority element → FIFO eviction by insertion order.
|
||||
type CacheState = (PSQ.HashPSQ ByteString Word64 CacheEntry, Int)
|
||||
|
||||
data NamesEnv = NamesEnv
|
||||
{ config :: NamesConfig,
|
||||
ethCall :: EthCall,
|
||||
cache :: TVar CacheState,
|
||||
inflight :: TVar (PSQ.HashPSQ ByteString Word64 (TMVar (Either ResolveError NameRecord))),
|
||||
rpcEnv :: Maybe EthRpcEnv, -- Nothing for test stubs
|
||||
cacheHitsRef :: IORef Int, -- shared with ServerStats.rslvStats.rslvCacheHits
|
||||
cacheMissRef :: IORef Int -- shared with ServerStats.rslvStats.rslvCacheMiss
|
||||
}
|
||||
|
||||
-- | Allocate resolver with real HTTP transport.
|
||||
-- `cacheHitsRef` and `cacheMissRef` are shared with ServerStats.rslvStats so
|
||||
-- the periodic CSV / Prometheus exporter sees per-request cache outcomes.
|
||||
newNamesEnv :: NamesConfig -> IORef Int -> IORef Int -> IO NamesEnv
|
||||
newNamesEnv cfg cacheHitsRef cacheMissRef = do
|
||||
rpc <- newEthRpcEnv (ethereumEndpoint cfg) (rpcAuth cfg) (rpcMaxResponseBytes cfg) (rpcMaxConcurrency cfg)
|
||||
let call to dat = ethCallReal rpc to dat
|
||||
newNamesEnvWith cfg call (Just rpc) cacheHitsRef cacheMissRef
|
||||
|
||||
-- | Allocate resolver with an injected ethCall (test seam).
|
||||
newNamesEnvWith :: NamesConfig -> EthCall -> Maybe EthRpcEnv -> IORef Int -> IORef Int -> IO NamesEnv
|
||||
newNamesEnvWith config ethCall rpcEnv cacheHitsRef cacheMissRef = do
|
||||
cache <- newTVarIO (PSQ.empty, 0)
|
||||
inflight <- newTVarIO PSQ.empty
|
||||
pure NamesEnv {config, ethCall, cache, inflight, rpcEnv, cacheHitsRef, cacheMissRef}
|
||||
|
||||
closeNamesEnv :: NamesEnv -> IO ()
|
||||
closeNamesEnv NamesEnv {rpcEnv} = maybe (pure ()) closeEthRpcEnv rpcEnv
|
||||
|
||||
-- | Resolve a lookup key. Coalesces concurrent identical requests, caches
|
||||
-- results for cacheSeconds, and bounds RPCs by rpcTimeoutMs.
|
||||
resolveName :: NamesEnv -> ByteString -> IO (Either ResolveError NameRecord)
|
||||
resolveName env key = do
|
||||
now <- getMonotonicTimeNSec
|
||||
cacheLookup env key now >>= \case
|
||||
Just rec -> do
|
||||
atomicModifyIORef'_ (cacheHitsRef env) (+ 1)
|
||||
pure (Right rec)
|
||||
Nothing -> do
|
||||
atomicModifyIORef'_ (cacheMissRef env) (+ 1)
|
||||
coalesce env key now
|
||||
|
||||
cacheLookup :: NamesEnv -> ByteString -> Word64 -> IO (Maybe NameRecord)
|
||||
cacheLookup NamesEnv {config, cache} key now = atomically $ do
|
||||
(psq, totalBytes) <- readTVar cache
|
||||
case PSQ.lookup key psq of
|
||||
Just (insertedAt, ce)
|
||||
| now < insertedAt + ttlNs config -> pure (Just (ceRecord ce))
|
||||
| otherwise -> do
|
||||
-- Expired: evict and signal miss.
|
||||
writeTVar cache (PSQ.delete key psq, totalBytes - ceBytes ce)
|
||||
pure Nothing
|
||||
Nothing -> pure Nothing
|
||||
|
||||
ttlNs :: NamesConfig -> Word64
|
||||
ttlNs cfg = fromIntegral (cacheSeconds cfg) * 1000000000
|
||||
|
||||
-- | Leader/waiter coalescing. Leader runs the RPC under E.mask; waiters
|
||||
-- block on the leader's TMVar. Cleanup runs even on async exception.
|
||||
coalesce :: NamesEnv -> ByteString -> Word64 -> IO (Either ResolveError NameRecord)
|
||||
coalesce env@NamesEnv {inflight} key now = do
|
||||
ticket <- atomically $ do
|
||||
flight <- readTVar inflight
|
||||
case PSQ.lookup key flight of
|
||||
Just (_, mv) -> pure (Right mv)
|
||||
Nothing -> do
|
||||
mv <- newEmptyTMVar
|
||||
writeTVar inflight (PSQ.insert key now mv flight)
|
||||
pure (Left mv)
|
||||
case ticket of
|
||||
Right mv -> atomically (readTMVar mv) -- waiter
|
||||
Left mv -> E.mask $ \restore -> do
|
||||
r <-
|
||||
restore (fetchOnceTimed env key)
|
||||
`E.catch` \(e :: E.SomeException) -> pure (Left (mapEthExn e))
|
||||
atomically $ do
|
||||
putTMVar mv r
|
||||
modifyTVar' inflight (PSQ.delete key)
|
||||
case r of
|
||||
Right rec -> cacheInsert env key now rec
|
||||
Left _ -> pure ()
|
||||
pure r
|
||||
|
||||
mapEthExn :: E.SomeException -> ResolveError
|
||||
mapEthExn _ = EthHttpErr
|
||||
|
||||
fetchOnceTimed :: NamesEnv -> ByteString -> IO (Either ResolveError NameRecord)
|
||||
fetchOnceTimed env key =
|
||||
timeout (rpcTimeoutMs (config env) * 1000) (fetchOnce env key) >>= \case
|
||||
Just r -> pure r
|
||||
Nothing -> pure (Left TimedOut)
|
||||
|
||||
fetchOnce :: NamesEnv -> ByteString -> IO (Either ResolveError NameRecord)
|
||||
fetchOnce env@NamesEnv {ethCall, config} key = do
|
||||
let node = namehash key
|
||||
callData = encodeGetRecord node
|
||||
to = unNameOwner (snrcAddress config)
|
||||
ethCall to callData >>= \case
|
||||
Left (HttpFailure _) -> pure (Left EthHttpErr)
|
||||
Left (HttpStatusErr _) -> pure (Left EthHttpErr)
|
||||
Left BodyTooLarge -> pure (Left EthDecodeErr)
|
||||
Left (InvalidJson _) -> pure (Left EthDecodeErr)
|
||||
Left (JsonRpcErr c m) -> pure (Left EthRpcErr {rpcCode = c, rpcMessage = m})
|
||||
Right ret -> case decodeGetRecord ret of
|
||||
Right Nothing -> pure (Left NotFound)
|
||||
Right (Just rec) -> pure (Right rec)
|
||||
Left _ -> pure (Left EthDecodeErr)
|
||||
|
||||
cacheInsert :: NamesEnv -> ByteString -> Word64 -> NameRecord -> IO ()
|
||||
cacheInsert NamesEnv {config, cache} key now rec = atomically $ do
|
||||
(psq, totalBytes) <- readTVar cache
|
||||
let entryBytes = estimateBytes rec
|
||||
(psq', totalBytes') = evictWhile psq totalBytes
|
||||
evictWhile p tb
|
||||
| PSQ.size p > cacheMaxEntries config || tb + entryBytes > cacheMaxBytes config =
|
||||
case PSQ.minView p of
|
||||
Just (_, _, ce, rest) -> evictWhile rest (tb - ceBytes ce)
|
||||
Nothing -> (p, tb)
|
||||
| otherwise = (p, tb)
|
||||
ce = CacheEntry {ceRecord = rec, ceBytes = entryBytes}
|
||||
writeTVar cache (PSQ.insert key now ce psq', totalBytes' + entryBytes)
|
||||
|
||||
-- | Approximate byte cost of a cached NameRecord (overhead + content).
|
||||
-- Tight enough that cacheMaxBytes bounds real memory; not byte-exact.
|
||||
estimateBytes :: NameRecord -> Int
|
||||
estimateBytes _ = 4096 -- conservative upper bound per NameRecord
|
||||
@@ -59,7 +59,7 @@ data RTSubscriberMetrics = RTSubscriberMetrics
|
||||
{-# FOURMOLU_DISABLE\n#-}
|
||||
prometheusMetrics :: ServerMetrics -> RealTimeMetrics -> UTCTime -> Text
|
||||
prometheusMetrics sm rtm ts =
|
||||
time <> queues <> subscriptions <> messages <> ntfMessages <> ntfs <> relays <> services <> info
|
||||
time <> queues <> subscriptions <> messages <> ntfMessages <> ntfs <> relays <> services <> names <> info
|
||||
where
|
||||
ServerMetrics {statsData, activeQueueCounts = ps, activeNtfCounts = psNtf, entityCounts, rtsOptions} = sm
|
||||
RealTimeMetrics
|
||||
@@ -128,7 +128,8 @@ prometheusMetrics sm rtm ts =
|
||||
_rcvServicesSubDuplicate,
|
||||
_qCount,
|
||||
_msgCount,
|
||||
_ntfCount
|
||||
_ntfCount,
|
||||
_rslvStats
|
||||
} = statsData
|
||||
time =
|
||||
"# Recorded at: " <> T.pack (iso8601Show ts) <> "\n\
|
||||
@@ -459,6 +460,39 @@ prometheusMetrics sm rtm ts =
|
||||
\# TYPE simplex_smp_" <> pfx <> "_services_sub_fewer_total gauge\n\
|
||||
\simplex_smp_" <> pfx <> "_services_sub_fewer_total " <> mshow (_srvSubFewerTotal ss) <> "\n# " <> pfx <> ".srvSubFewerTotal\n\
|
||||
\\n"
|
||||
names =
|
||||
let NameResolverStatsData {_rslvReqs, _rslvSucc, _rslvNotFound, _rslvEthErrs, _rslvCacheHits, _rslvCacheMiss, _rslvDisabled} = _rslvStats
|
||||
in "# Names\n\
|
||||
\# -----\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_names_reqs Total RSLV requests forwarded to this server.\n\
|
||||
\# TYPE simplex_smp_names_reqs counter\n\
|
||||
\simplex_smp_names_reqs " <> mshow _rslvReqs <> "\n# rslvReqs\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_names_success NameRecord successfully resolved and returned.\n\
|
||||
\# TYPE simplex_smp_names_success counter\n\
|
||||
\simplex_smp_names_success " <> mshow _rslvSucc <> "\n# rslvSucc\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_names_not_found Lookup key has no corresponding NameRecord on chain (zero-owner sentinel).\n\
|
||||
\# TYPE simplex_smp_names_not_found counter\n\
|
||||
\simplex_smp_names_not_found " <> mshow _rslvNotFound <> "\n# rslvNotFound\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_names_eth_errs Ethereum endpoint or ABI errors.\n\
|
||||
\# TYPE simplex_smp_names_eth_errs counter\n\
|
||||
\simplex_smp_names_eth_errs " <> mshow _rslvEthErrs <> "\n# rslvEthErrs\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_names_cache_hits Resolution served from cache.\n\
|
||||
\# TYPE simplex_smp_names_cache_hits counter\n\
|
||||
\simplex_smp_names_cache_hits " <> mshow _rslvCacheHits <> "\n# rslvCacheHits\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_names_cache_miss Resolution required an eth_call.\n\
|
||||
\# TYPE simplex_smp_names_cache_miss counter\n\
|
||||
\simplex_smp_names_cache_miss " <> mshow _rslvCacheMiss <> "\n# rslvCacheMiss\n\
|
||||
\\n\
|
||||
\# HELP simplex_smp_names_disabled RSLV requests rejected because the names role is disabled.\n\
|
||||
\# TYPE simplex_smp_names_disabled counter\n\
|
||||
\simplex_smp_names_disabled " <> mshow _rslvDisabled <> "\n# rslvDisabled\n\
|
||||
\\n"
|
||||
info =
|
||||
"# Info\n\
|
||||
\# ----\n\
|
||||
|
||||
@@ -39,9 +39,18 @@ module Simplex.Messaging.Server.Stats
|
||||
setServiceStats,
|
||||
emptyTimeBuckets,
|
||||
updateTimeBuckets,
|
||||
incStat,
|
||||
NameResolverStats (..),
|
||||
NameResolverStatsData (..),
|
||||
newNameResolverStats,
|
||||
newNameResolverStatsData,
|
||||
getNameResolverStatsData,
|
||||
getResetNameResolverStatsData,
|
||||
setNameResolverStats,
|
||||
) where
|
||||
|
||||
import Control.Applicative (optional, (<|>))
|
||||
import Control.Monad.IO.Class (MonadIO, liftIO)
|
||||
import qualified Data.Attoparsec.ByteString.Char8 as A
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
@@ -123,7 +132,8 @@ data ServerStats = ServerStats
|
||||
rcvServicesSubDuplicate :: IORef Int,
|
||||
qCount :: IORef Int,
|
||||
msgCount :: IORef Int,
|
||||
ntfCount :: IORef Int
|
||||
ntfCount :: IORef Int,
|
||||
rslvStats :: NameResolverStats
|
||||
}
|
||||
|
||||
data ServerStatsData = ServerStatsData
|
||||
@@ -184,7 +194,8 @@ data ServerStatsData = ServerStatsData
|
||||
_rcvServicesSubDuplicate :: Int,
|
||||
_qCount :: Int,
|
||||
_msgCount :: Int,
|
||||
_ntfCount :: Int
|
||||
_ntfCount :: Int,
|
||||
_rslvStats :: NameResolverStatsData
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
@@ -248,6 +259,7 @@ newServerStats ts = do
|
||||
qCount <- newIORef 0
|
||||
msgCount <- newIORef 0
|
||||
ntfCount <- newIORef 0
|
||||
rslvStats <- newNameResolverStats
|
||||
pure
|
||||
ServerStats
|
||||
{ fromTime,
|
||||
@@ -307,7 +319,8 @@ newServerStats ts = do
|
||||
rcvServicesSubDuplicate,
|
||||
qCount,
|
||||
msgCount,
|
||||
ntfCount
|
||||
ntfCount,
|
||||
rslvStats
|
||||
}
|
||||
|
||||
getServerStatsData :: ServerStats -> IO ServerStatsData
|
||||
@@ -370,6 +383,7 @@ getServerStatsData s = do
|
||||
_qCount <- readIORef $ qCount s
|
||||
_msgCount <- readIORef $ msgCount s
|
||||
_ntfCount <- readIORef $ ntfCount s
|
||||
_rslvStats <- getNameResolverStatsData $ rslvStats s
|
||||
pure
|
||||
ServerStatsData
|
||||
{ _fromTime,
|
||||
@@ -429,7 +443,8 @@ getServerStatsData s = do
|
||||
_rcvServicesSubDuplicate,
|
||||
_qCount,
|
||||
_msgCount,
|
||||
_ntfCount
|
||||
_ntfCount,
|
||||
_rslvStats
|
||||
}
|
||||
|
||||
-- this function is not thread safe, it is used on server start only
|
||||
@@ -493,6 +508,7 @@ setServerStats s d = do
|
||||
writeIORef (qCount s) $! _qCount d
|
||||
writeIORef (msgCount s) $! _msgCount d
|
||||
writeIORef (ntfCount s) $! _ntfCount d
|
||||
setNameResolverStats (rslvStats s) $! _rslvStats d
|
||||
|
||||
instance StrEncoding ServerStatsData where
|
||||
strEncode d =
|
||||
@@ -557,7 +573,9 @@ instance StrEncoding ServerStatsData where
|
||||
"rcvServices:",
|
||||
strEncode (_rcvServices d),
|
||||
"ntfServices:",
|
||||
strEncode (_ntfServices d)
|
||||
strEncode (_ntfServices d),
|
||||
"rslvStats:",
|
||||
strEncode (_rslvStats d)
|
||||
]
|
||||
strP = do
|
||||
_fromTime <- "fromTime=" *> strP <* A.endOfLine
|
||||
@@ -628,6 +646,10 @@ instance StrEncoding ServerStatsData where
|
||||
_pMsgFwdsRecv <- opt "pMsgFwdsRecv="
|
||||
_rcvServices <- serviceStatsP "rcvServices:"
|
||||
_ntfServices <- serviceStatsP "ntfServices:"
|
||||
_rslvStats <-
|
||||
optional ("rslvStats:" <* A.endOfLine) >>= \case
|
||||
Just _ -> strP <* optional A.endOfLine
|
||||
_ -> pure newNameResolverStatsData
|
||||
pure
|
||||
ServerStatsData
|
||||
{ _fromTime,
|
||||
@@ -687,7 +709,8 @@ instance StrEncoding ServerStatsData where
|
||||
_rcvServicesSubDuplicate = 0,
|
||||
_qCount,
|
||||
_msgCount = 0,
|
||||
_ntfCount = 0
|
||||
_ntfCount = 0,
|
||||
_rslvStats
|
||||
}
|
||||
where
|
||||
opt s = A.string s *> strP <* A.endOfLine <|> pure 0
|
||||
@@ -786,6 +809,10 @@ updatePeriodStats ps (EntityId pId) = do
|
||||
ph = hash pId
|
||||
updatePeriod ref = unlessM (IS.member ph <$> readIORef ref) $ atomicModifyIORef'_ ref $ IS.insert ph
|
||||
|
||||
incStat :: MonadIO m => IORef Int -> m ()
|
||||
incStat r = liftIO $ atomicModifyIORef'_ r (+ 1)
|
||||
{-# INLINE incStat #-}
|
||||
|
||||
data ProxyStats = ProxyStats
|
||||
{ pRequests :: IORef Int,
|
||||
pSuccesses :: IORef Int, -- includes destination server error responses that will be forwarded to the client
|
||||
@@ -862,6 +889,109 @@ instance StrEncoding ProxyStatsData where
|
||||
_pErrorsOther <- "errorsOther=" *> strP
|
||||
pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther}
|
||||
|
||||
data NameResolverStats = NameResolverStats
|
||||
{ rslvReqs :: IORef Int,
|
||||
rslvSucc :: IORef Int,
|
||||
rslvNotFound :: IORef Int,
|
||||
rslvEthErrs :: IORef Int,
|
||||
rslvCacheHits :: IORef Int,
|
||||
rslvCacheMiss :: IORef Int,
|
||||
rslvDisabled :: IORef Int
|
||||
}
|
||||
|
||||
newNameResolverStats :: IO NameResolverStats
|
||||
newNameResolverStats = do
|
||||
rslvReqs <- newIORef 0
|
||||
rslvSucc <- newIORef 0
|
||||
rslvNotFound <- newIORef 0
|
||||
rslvEthErrs <- newIORef 0
|
||||
rslvCacheHits <- newIORef 0
|
||||
rslvCacheMiss <- newIORef 0
|
||||
rslvDisabled <- newIORef 0
|
||||
pure NameResolverStats {rslvReqs, rslvSucc, rslvNotFound, rslvEthErrs, rslvCacheHits, rslvCacheMiss, rslvDisabled}
|
||||
|
||||
data NameResolverStatsData = NameResolverStatsData
|
||||
{ _rslvReqs :: Int,
|
||||
_rslvSucc :: Int,
|
||||
_rslvNotFound :: Int,
|
||||
_rslvEthErrs :: Int,
|
||||
_rslvCacheHits :: Int,
|
||||
_rslvCacheMiss :: Int,
|
||||
_rslvDisabled :: Int
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
newNameResolverStatsData :: NameResolverStatsData
|
||||
newNameResolverStatsData =
|
||||
NameResolverStatsData
|
||||
{ _rslvReqs = 0,
|
||||
_rslvSucc = 0,
|
||||
_rslvNotFound = 0,
|
||||
_rslvEthErrs = 0,
|
||||
_rslvCacheHits = 0,
|
||||
_rslvCacheMiss = 0,
|
||||
_rslvDisabled = 0
|
||||
}
|
||||
|
||||
getNameResolverStatsData :: NameResolverStats -> IO NameResolverStatsData
|
||||
getNameResolverStatsData s = do
|
||||
_rslvReqs <- readIORef $ rslvReqs s
|
||||
_rslvSucc <- readIORef $ rslvSucc s
|
||||
_rslvNotFound <- readIORef $ rslvNotFound s
|
||||
_rslvEthErrs <- readIORef $ rslvEthErrs s
|
||||
_rslvCacheHits <- readIORef $ rslvCacheHits s
|
||||
_rslvCacheMiss <- readIORef $ rslvCacheMiss s
|
||||
_rslvDisabled <- readIORef $ rslvDisabled s
|
||||
pure NameResolverStatsData {_rslvReqs, _rslvSucc, _rslvNotFound, _rslvEthErrs, _rslvCacheHits, _rslvCacheMiss, _rslvDisabled}
|
||||
|
||||
getResetNameResolverStatsData :: NameResolverStats -> IO NameResolverStatsData
|
||||
getResetNameResolverStatsData s = do
|
||||
_rslvReqs <- atomicSwapIORef (rslvReqs s) 0
|
||||
_rslvSucc <- atomicSwapIORef (rslvSucc s) 0
|
||||
_rslvNotFound <- atomicSwapIORef (rslvNotFound s) 0
|
||||
_rslvEthErrs <- atomicSwapIORef (rslvEthErrs s) 0
|
||||
_rslvCacheHits <- atomicSwapIORef (rslvCacheHits s) 0
|
||||
_rslvCacheMiss <- atomicSwapIORef (rslvCacheMiss s) 0
|
||||
_rslvDisabled <- atomicSwapIORef (rslvDisabled s) 0
|
||||
pure NameResolverStatsData {_rslvReqs, _rslvSucc, _rslvNotFound, _rslvEthErrs, _rslvCacheHits, _rslvCacheMiss, _rslvDisabled}
|
||||
|
||||
-- not thread safe; used on server start only
|
||||
setNameResolverStats :: NameResolverStats -> NameResolverStatsData -> IO ()
|
||||
setNameResolverStats s d = do
|
||||
writeIORef (rslvReqs s) $! _rslvReqs d
|
||||
writeIORef (rslvSucc s) $! _rslvSucc d
|
||||
writeIORef (rslvNotFound s) $! _rslvNotFound d
|
||||
writeIORef (rslvEthErrs s) $! _rslvEthErrs d
|
||||
writeIORef (rslvCacheHits s) $! _rslvCacheHits d
|
||||
writeIORef (rslvCacheMiss s) $! _rslvCacheMiss d
|
||||
writeIORef (rslvDisabled s) $! _rslvDisabled d
|
||||
|
||||
instance StrEncoding NameResolverStatsData where
|
||||
strEncode NameResolverStatsData {_rslvReqs, _rslvSucc, _rslvNotFound, _rslvEthErrs, _rslvCacheHits, _rslvCacheMiss, _rslvDisabled} =
|
||||
"reqs="
|
||||
<> strEncode _rslvReqs
|
||||
<> "\nsucc="
|
||||
<> strEncode _rslvSucc
|
||||
<> "\nnotFound="
|
||||
<> strEncode _rslvNotFound
|
||||
<> "\nethErrs="
|
||||
<> strEncode _rslvEthErrs
|
||||
<> "\ncacheHits="
|
||||
<> strEncode _rslvCacheHits
|
||||
<> "\ncacheMiss="
|
||||
<> strEncode _rslvCacheMiss
|
||||
<> "\ndisabled="
|
||||
<> strEncode _rslvDisabled
|
||||
strP = do
|
||||
_rslvReqs <- "reqs=" *> strP <* A.endOfLine
|
||||
_rslvSucc <- "succ=" *> strP <* A.endOfLine
|
||||
_rslvNotFound <- "notFound=" *> strP <* A.endOfLine
|
||||
_rslvEthErrs <- "ethErrs=" *> strP <* A.endOfLine
|
||||
_rslvCacheHits <- "cacheHits=" *> strP <* A.endOfLine
|
||||
_rslvCacheMiss <- "cacheMiss=" *> strP <* A.endOfLine
|
||||
_rslvDisabled <- "disabled=" *> strP
|
||||
pure NameResolverStatsData {_rslvReqs, _rslvSucc, _rslvNotFound, _rslvEthErrs, _rslvCacheHits, _rslvCacheMiss, _rslvDisabled}
|
||||
|
||||
data ServiceStats = ServiceStats
|
||||
{ srvAssocNew :: IORef Int,
|
||||
srvAssocDuplicate :: IORef Int,
|
||||
|
||||
@@ -57,6 +57,7 @@ module Simplex.Messaging.Transport
|
||||
newNtfCredsSMPVersion,
|
||||
clientNoticesSMPVersion,
|
||||
rcvServiceSMPVersion,
|
||||
namesSMPVersion,
|
||||
simplexMQVersion,
|
||||
smpBlockSize,
|
||||
TransportConfig (..),
|
||||
@@ -223,6 +224,9 @@ clientNoticesSMPVersion = VersionSMP 18
|
||||
rcvServiceSMPVersion :: VersionSMP
|
||||
rcvServiceSMPVersion = VersionSMP 19
|
||||
|
||||
namesSMPVersion :: VersionSMP
|
||||
namesSMPVersion = VersionSMP 20
|
||||
|
||||
minClientSMPRelayVersion :: VersionSMP
|
||||
minClientSMPRelayVersion = VersionSMP 6
|
||||
|
||||
@@ -230,13 +234,13 @@ minServerSMPRelayVersion :: VersionSMP
|
||||
minServerSMPRelayVersion = VersionSMP 6
|
||||
|
||||
currentClientSMPRelayVersion :: VersionSMP
|
||||
currentClientSMPRelayVersion = VersionSMP 19
|
||||
currentClientSMPRelayVersion = VersionSMP 20
|
||||
|
||||
legacyServerSMPRelayVersion :: VersionSMP
|
||||
legacyServerSMPRelayVersion = VersionSMP 6
|
||||
|
||||
currentServerSMPRelayVersion :: VersionSMP
|
||||
currentServerSMPRelayVersion = VersionSMP 19
|
||||
currentServerSMPRelayVersion = VersionSMP 20
|
||||
|
||||
-- Max SMP protocol version to be used in e2e encrypted
|
||||
-- connection between client and server, as defined by SMP proxy.
|
||||
@@ -244,7 +248,7 @@ currentServerSMPRelayVersion = VersionSMP 19
|
||||
-- to prevent client version fingerprinting by the
|
||||
-- destination relays when clients upgrade at different times.
|
||||
proxiedSMPRelayVersion :: VersionSMP
|
||||
proxiedSMPRelayVersion = VersionSMP 18
|
||||
proxiedSMPRelayVersion = VersionSMP 20
|
||||
|
||||
-- minimal supported protocol version is 6
|
||||
-- TODO remove code that supports sending commands without batching
|
||||
|
||||
Reference in New Issue
Block a user