smp server: refactor subscriptions and delivery in order to always response SOK on subscription with an optional message to follow. (#1573)

* smp server: refactor subscriptions and delivery

* metric for time between MSG and ACK

* cleanup

* refactor pattern match for ghc 8.10.7

* time buckets

* split max time metric

* histogram

* fix
This commit is contained in:
Evgeny
2025-07-12 14:18:38 +01:00
committed by GitHub
parent 99e59b73a3
commit 62733ef4c1
10 changed files with 275 additions and 197 deletions

View File

@@ -1224,8 +1224,8 @@ okSMPCommands cmd c nm qs = L.map process <$> sendProtocolCommands c nm cs
Left e -> Left e
-- | Send SMP command
sendSMPCommand :: PartyI p => SMPClient -> NetworkRequestMode -> Maybe C.APrivateAuthKey -> QueueId -> Command p -> ExceptT SMPClientError IO BrokerMsg
sendSMPCommand c nm pKey qId cmd = sendProtocolCommand c nm pKey qId (Cmd sParty cmd)
sendSMPCommand :: PartyI p => SMPClient -> NetworkRequestMode -> Maybe C.APrivateAuthKey -> EntityId -> Command p -> ExceptT SMPClientError IO BrokerMsg
sendSMPCommand c nm pKey entId cmd = sendProtocolCommand c nm pKey entId (Cmd sParty cmd)
{-# INLINE sendSMPCommand #-}
type PCTransmission err msg = (Either TransportError SentRawTransmission, Request err msg)

View File

@@ -114,6 +114,7 @@ module Simplex.Messaging.Protocol
BasicAuth (..),
SrvLoc (..),
CorrId (..),
pattern NoCorrId,
EntityId (..),
pattern NoEntity,
QueueId,
@@ -1370,6 +1371,9 @@ newtype CorrId = CorrId {bs :: ByteString}
deriving (Eq, Ord, Show)
deriving newtype (Encoding)
pattern NoCorrId :: CorrId
pattern NoCorrId = CorrId ""
instance IsString CorrId where
fromString = CorrId . fromString
{-# INLINE fromString #-}

View File

@@ -61,17 +61,18 @@ import qualified Data.ByteString.Lazy.Char8 as LB
import Data.Constraint (Dict (..))
import Data.Dynamic (toDyn)
import Data.Either (fromRight, partitionEithers)
import Data.Foldable (foldrM)
import Data.Functor (($>))
import Data.IORef
import Data.Int (Int64)
import qualified Data.IntMap.Strict as IM
import qualified Data.IntSet as IS
import Data.List (foldl', intercalate, mapAccumR)
import Data.List (foldl', intercalate)
import Data.List.NonEmpty (NonEmpty (..), (<|))
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing)
import Data.Maybe (fromMaybe, isJust, isNothing)
import Data.Semigroup (Sum (..))
import qualified Data.Set as S
import Data.Text (Text)
@@ -271,6 +272,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
as'' <- if prevServiceId == serviceId_ then pure [] else endServiceSub prevServiceId qId END
case serviceId_ of
Just serviceId -> do
modifyTVar' totalServiceSubs (+ 1) -- server count for all services
as <- endQueueSub qId END
as' <- cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers
pure $ as ++ as' ++ as''
@@ -282,8 +284,9 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
as <- endQueueSub qId DELD
as' <- endServiceSub serviceId qId DELD
pure $ as ++ as'
CSService serviceId -> do
CSService serviceId count -> do
modifyTVar' subClients $ IS.insert clntId -- add ID to server's subscribed cients
modifyTVar' totalServiceSubs (+ count) -- server count for all services
cancelServiceSubs serviceId =<< upsertSubscribedClient serviceId c serviceSubscribers
updateSubDisconnected = case clntSub of
-- do not insert client if it is already disconnected, but send END/DELD to any other client subscribed to this queue or service
@@ -296,7 +299,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
as <- endQueueSub qId DELD
as' <- endServiceSub serviceId qId DELD
pure $ as ++ as'
CSService serviceId -> cancelServiceSubs serviceId =<< lookupSubscribedClient serviceId serviceSubscribers
CSService serviceId _ -> cancelServiceSubs serviceId =<< lookupSubscribedClient serviceId serviceSubscribers
endQueueSub :: QueueId -> BrokerMsg -> STM [PrevClientSub s]
endQueueSub qId msg = prevSub qId msg (CSAEndSub qId) =<< lookupDeleteSubscribedClient qId queueSubscribers
endServiceSub :: Maybe ServiceId -> QueueId -> BrokerMsg -> STM [PrevClientSub s]
@@ -382,7 +385,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
let cancelNtfs s = throwSTM $ userError $ s <> ", " <> show (length ts_) <> " ntfs kept"
unlessM (currentClient readTVar c) $ cancelNtfs "not current client"
whenM (isFullTBQueue sndQ) $ cancelNtfs "sending queue full"
writeTBQueue sndQ ts
writeTBQueue sndQ (ts, [])
pure $ length ts_
currentClient :: Monad m => (forall a. TVar a -> m a) -> Client s' -> m Bool
currentClient rd Client {clientId, connected} = (&&) <$> rd connected <*> (IS.member clientId <$> rd subClients)
@@ -393,7 +396,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
ntfs -> do
writeTVar v []
pure $ foldl' (\acc' ntf -> nmsg nId ntf : acc') acc ntfs -- reverses, to order by time
nmsg nId MsgNtf {ntfNonce, ntfEncMeta} = (CorrId "", nId, NMSG ntfNonce ntfEncMeta)
nmsg nId MsgNtf {ntfNonce, ntfEncMeta} = (NoCorrId, nId, NMSG ntfNonce ntfEncMeta)
updateNtfStats :: Client s' -> Either SomeException Int -> IO ()
updateNtfStats Client {clientId} = \case
Right 0 -> pure ()
@@ -419,14 +422,14 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
where
enqueueEvts evts c@Client {connected, sndQ} =
whenM (readTVarIO connected) $ do
sent <- atomically $ tryWriteTBQueue sndQ ts
sent <- atomically $ tryWriteTBQueue sndQ (ts, [])
if sent
then updateEndStats
else -- if queue is full it can block
forkClient c ("sendPendingEvtsThread.queueEvts") $
atomically (writeTBQueue sndQ ts) >> updateEndStats
forkClient c "sendPendingEvtsThread.queueEvts" $
atomically (writeTBQueue sndQ (ts, [])) >> updateEndStats
where
ts = L.map (\(entId, evt) -> (CorrId "", entId, evt)) evts
ts = L.map (\(entId, evt) -> (NoCorrId, entId, evt)) evts
-- this accounts for both END and DELD events
updateEndStats = do
let len = L.length evts
@@ -703,7 +706,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
then (metrics {subsCount = subsCount metrics + cnt, subClientsCount = subClientsCount metrics + 1}, times')
else acc
countSubs acc@(!cnt, times) Sub {delivered} = do
delivered_ <- atomically $ tryReadTMVar delivered
delivered_ <- readTVarIO delivered
pure $ case delivered_ of
Nothing -> acc
Just (_, ts) -> (cnt + 1, updateTimeBuckets ts ts' times)
@@ -1095,7 +1098,7 @@ receive h@THandle {params = THandleParams {thAuth, sessionId}} ms Client {rcvQ,
atomically . (writeTVar rcvActiveAt $!) =<< getSystemTime
let (es, ts') = partitionEithers $ L.toList ts
errs = map (second ERR) es
case ts' of
errs' <- case ts' of
(_, _, (_, _, Cmd p cmd)) : rest -> do
let service = peerClientService =<< thAuth
(errs', cmds) <- partitionEithers <$> case batchParty p of
@@ -1105,9 +1108,10 @@ receive h@THandle {params = THandleParams {thAuth, sessionId}} ms Client {rcvQ,
qs <- getQueueRecs ms p $ map queueId ts'
zipWithM (\t -> verified stats t . verifyLoadedQueue service thAuth t) ts' qs
_ -> mapM (\t -> verified stats t =<< verifyTransmission ms service thAuth t) ts'
write rcvQ cmds
write sndQ $ errs ++ errs'
[] -> write sndQ errs
mapM_ (atomically . writeTBQueue rcvQ) $ L.nonEmpty cmds
pure $ errs ++ errs'
[] -> pure errs
mapM_ (atomically . writeTBQueue sndQ . (,[])) $ L.nonEmpty errs'
where
sameParty :: SParty p -> SignedTransmission Cmd -> Bool
sameParty p (_, _, (_, _, Cmd p' _)) = isJust $ testEquality p p'
@@ -1129,32 +1133,27 @@ receive h@THandle {params = THandleParams {thAuth, sessionId}} ms Client {rcvQ,
NSUB -> incStat $ ntfSubAuth stats
GET -> incStat $ msgGetAuth stats
_ -> pure ()
write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty
send :: Transport c => MVar (THandleSMP c 'TServer) -> Client s -> IO ()
send th c@Client {sndQ, msgQ, clientTHParams = THandleParams {sessionId}} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send"
forever $ atomically (readTBQueue sndQ) >>= sendTransmissions
where
sendTransmissions :: NonEmpty (Transmission BrokerMsg) -> IO ()
sendTransmissions ts
| L.length ts <= 2 = tSend th c ts
-- If the request had batched subscriptions
-- this will reply SOKs to all SUBs in the first batched transmission,
-- to reduce client timeouts.
-- After that all messages will be sent in separate transmissions,
-- without any client response timeouts, and allowing them to interleave
-- with other requests responses.
sendTransmissions :: (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]) -> IO ()
sendTransmissions (ts, []) = tSend th c ts
sendTransmissions (ts, msg : msgs)
| length ts <= 4 = do -- up to 4 SOKs can be in one block with MSG (see testBatchSubResponses test)
tSend th c $ ts <> [msg]
mapM_ (atomically . writeTBQueue msgQ) $ L.nonEmpty msgs
| otherwise = do
let (msgs_, ts') = mapAccumR splitMessages [] ts
-- If the request had batched subscriptions and L.length ts > 2
-- this will reply OK to all SUBs in the first batched transmission,
-- to reduce client timeouts.
tSend th c ts'
-- After that all messages will be sent in separate transmissions,
-- without any client response timeouts, and allowing them to interleave
-- with other requests responses.
mapM_ (atomically . writeTBQueue msgQ) $ L.nonEmpty msgs_
where
splitMessages :: [Transmission BrokerMsg] -> Transmission BrokerMsg -> ([Transmission BrokerMsg], Transmission BrokerMsg)
splitMessages msgs t@(corrId, entId, cmd) = case cmd of
-- replace MSG response with OK, accumulating MSG in a separate list.
MSG {} -> ((CorrId "", entId, cmd) : msgs, (corrId, entId, OK))
_ -> (msgs, t)
tSend th c ts
atomically $ writeTBQueue msgQ (msg :| msgs)
sendMsg :: Transport c => MVar (THandleSMP c 'TServer) -> Client s -> IO ()
sendMsg th c@Client {msgQ, clientTHParams = THandleParams {sessionId}} = do
@@ -1200,7 +1199,7 @@ verifyLoadedQueue service thAuth t@(tAuth, authorized, (corrId, _, _)) = \case
Left e -> VRFailed e
verifyQueueTransmission :: forall s. Maybe THPeerClientService -> Maybe (THandleAuth 'TServer) -> SignedTransmission Cmd -> Maybe (StoreQueue s, QueueRec) -> VerificationResult s
verifyQueueTransmission service thAuth (tAuth, authorized, (corrId, _, command@(Cmd p cmd))) q_
verifyQueueTransmission service thAuth (tAuth, authorized, (corrId, entId, command@(Cmd p cmd))) q_
| not checkRole = VRFailed $ CMD PROHIBITED
| not verifyServiceSig = VRFailed SERVICE
| otherwise = vc p cmd
@@ -1226,8 +1225,8 @@ verifyQueueTransmission service thAuth (tAuth, authorized, (corrId, _, command@(
verify = verifyCmdAuthorization thAuth tAuth authorized' corrId
verifyServiceCmd :: VerificationResult s
verifyServiceCmd = case (service, tAuth) of
(Just THClientService {serviceKey = k}, Just (TASignature (C.ASignature C.SEd25519 s), Nothing))
| C.verify' k s authorized -> VRVerified Nothing
(Just THClientService {serviceId, serviceKey = k}, Just (TASignature (C.ASignature C.SEd25519 s), Nothing))
| entId == serviceId && C.verify' k s authorized -> VRVerified Nothing
_ -> VRFailed SERVICE
-- this function verify service signature for commands that use it in service sessions
verifyServiceSig
@@ -1321,19 +1320,20 @@ client
-- TODO [certs rcv] rcv subscriptions
Server {subscribers, ntfSubscribers}
ms
clnt@Client {clientId, subscriptions, ntfSubscriptions, ntfServiceSubscribed, serviceSubsCount = _todo', ntfServiceSubsCount, rcvQ, sndQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do
clnt@Client {clientId, ntfSubscriptions, ntfServiceSubscribed, serviceSubsCount = _todo', ntfServiceSubsCount, rcvQ, sndQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
let THandleParams {thVersion} = thParams'
service = peerClientService =<< thAuth thParams'
clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
process t acc@(rs, msgs) =
(maybe acc (\(!r, !msg_) -> (r : rs, maybe msgs (: msgs) msg_)))
<$> processCommand clntServiceId thVersion t
forever $
atomically (readTBQueue rcvQ)
>>= mapM (processCommand service thVersion)
>>= mapM_ reply . L.nonEmpty . catMaybes . L.toList
>>= foldrM process ([], [])
>>= \(rs_, msgs) -> mapM_ (atomically . writeTBQueue sndQ . (,msgs)) (L.nonEmpty rs_)
where
reply :: MonadIO m => NonEmpty (Transmission BrokerMsg) -> m ()
reply = atomically . writeTBQueue sndQ
processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe (Transmission BrokerMsg))
processProxiedCmd (corrId, EntityId sessId, command) = (corrId,EntityId sessId,) <$$> case command of
processProxiedCmd :: Transmission (Command 'ProxiedClient) -> M s (Maybe ResponseAndMessage)
processProxiedCmd (corrId, EntityId sessId, command) = (\t -> ((corrId, EntityId sessId, t), Nothing)) <$$> case command of
PRXY srv auth -> ifM allowProxy getRelay (pure $ Just $ ERR $ PROXY BASIC_AUTH)
where
allowProxy = do
@@ -1396,7 +1396,7 @@ client
forkProxiedCmd cmdAction = do
bracket_ wait signal . forkClient clnt (B.unpack $ "client $" <> encode sessionId <> " proxy") $ do
-- commands MUST be processed under a reasonable timeout or the client would halt
cmdAction >>= \t -> reply [(corrId, EntityId sessId, t)]
cmdAction >>= \t -> atomically $ writeTBQueue sndQ ([(corrId, EntityId sessId, t)], [])
pure Nothing
where
wait = do
@@ -1412,32 +1412,32 @@ client
mkIncProxyStats ps psOwn own sel = do
incStat $ sel ps
when own $ incStat $ sel psOwn
processCommand :: Maybe THPeerClientService -> VersionSMP -> VerifiedTransmission s -> M s (Maybe (Transmission BrokerMsg))
processCommand service clntVersion (q_, (corrId, entId, cmd)) = case cmd of
processCommand :: Maybe ServiceId -> VersionSMP -> VerifiedTransmission s -> M s (Maybe ResponseAndMessage)
processCommand clntServiceId clntVersion (q_, (corrId, entId, cmd)) = case cmd of
Cmd SProxiedClient command -> processProxiedCmd (corrId, entId, command)
Cmd SSender command -> Just <$> case command of
Cmd SSender command -> case command of
SKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k
SEND flags msgBody -> withQueue_ False $ sendMessage flags msgBody
Cmd SIdleClient PING -> pure $ Just (corrId, NoEntity, PONG)
Cmd SProxyService (RFWD encBlock) -> Just . (corrId,NoEntity,) <$> processForwardedCommand encBlock
Cmd SSenderLink command -> Just <$> case command of
SEND flags msgBody -> response <$> withQueue_ False err (sendMessage flags msgBody)
Cmd SIdleClient PING -> pure $ response (corrId, NoEntity, PONG)
Cmd SProxyService (RFWD encBlock) -> response . (corrId,NoEntity,) <$> processForwardedCommand encBlock
Cmd SSenderLink command -> case command of
LKEY k -> withQueue $ \q qr -> checkMode QMMessaging qr $ secureQueue_ q k $>> getQueueLink_ q qr
LGET -> withQueue $ \q qr -> checkContact qr $ getQueueLink_ q qr
Cmd SNotifier NSUB -> Just . (corrId,entId,) <$> case q_ of
Cmd SNotifier NSUB -> response . (corrId,entId,) <$> case q_ of
Just (q, QueueRec {notifier = Just ntfCreds}) -> subscribeNotifications q ntfCreds
_ -> pure $ ERR INTERNAL
Cmd SNotifierService NSUBS -> Just . (corrId,entId,) <$> case service of
Just s -> subscribeServiceNotifications s
Cmd SNotifierService NSUBS -> response . (corrId,entId,) <$> case clntServiceId of
Just serviceId -> subscribeServiceNotifications serviceId
Nothing -> pure $ ERR INTERNAL
Cmd SCreator (NEW nqr@NewQueueReq {auth_}) ->
Just <$> ifM allowNew (createQueue nqr) (pure (corrId, entId, ERR AUTH))
response <$> ifM allowNew (createQueue nqr) (pure (corrId, entId, ERR AUTH))
where
allowNew = do
ServerConfig {allowNewQueues, newQueueBasicAuth} <- asks config
pure $ allowNewQueues && maybe True ((== auth_) . Just) newQueueBasicAuth
Cmd SRecipient command ->
Just <$> case command of
SUB -> withQueue subscribeQueue
case command of
SUB -> withQueue' subscribeQueueAndDeliver
GET -> withQueue getMessage
ACK msgId -> withQueue $ acknowledgeMsg msgId
KEY sKey -> withQueue $ \q _ -> either err (corrId,entId,) <$> secureQueue_ q sKey
@@ -1453,15 +1453,15 @@ client
Nothing -> pure ok
NKEY nKey dhKey -> withQueue $ \q _ -> addQueueNotifier_ q nKey dhKey
NDEL -> withQueue $ \q _ -> deleteQueueNotifier_ q
OFF -> maybe (pure $ err INTERNAL) suspendQueue_ q_
DEL -> maybe (pure $ err INTERNAL) delQueueAndMsgs q_
OFF -> response <$> maybe (pure $ err INTERNAL) suspendQueue_ q_
DEL -> response <$> maybe (pure $ err INTERNAL) delQueueAndMsgs q_
QUE -> withQueue $ \q qr -> (corrId,entId,) <$> getQueueInfo q qr
Cmd SRecipientService SUBS -> pure $ Just $ err (CMD PROHIBITED) -- "TODO [certs rcv]"
Cmd SRecipientService SUBS -> pure $ response $ err (CMD PROHIBITED) -- "TODO [certs rcv]"
where
createQueue :: NewQueueReq -> M s (Transmission BrokerMsg)
createQueue NewQueueReq {rcvAuthKey, rcvDhKey, subMode, queueReqData}
| isJust service && subMode == SMOnlyCreate = pure (corrId, entId, ERR $ CMD PROHIBITED)
| otherwise = time "NEW" $ do
| isJust clntServiceId && subMode == SMOnlyCreate = pure (corrId, entId, ERR $ CMD PROHIBITED)
| otherwise = do
g <- asks random
idSize <- asks $ queueIdBytes . config
updatedAt <- Just <$> liftIO getSystemDate
@@ -1491,7 +1491,6 @@ client
-- notifierId <- randId
-- pure (NtfCreds {notifierId, notifierKey, rcvNtfDhSecret}, ServerNtfCreds notifierId rcvPubDhKey)
let queueMode = queueReqMode <$> queueReqData
rcvServiceId = (\THClientService {serviceId} -> serviceId) <$> service
qr =
QueueRec
{ senderId = sndId,
@@ -1504,7 +1503,7 @@ client
notifier = Nothing, -- fst <$> ntf,
status = EntityActive,
updatedAt,
rcvServiceId
rcvServiceId = clntServiceId
}
liftIO (addQueue ms rcvId qr) >>= \case
Left DUPLICATE_ -- TODO [short links] possibly, we somehow need to understand which IDs caused collision to retry if it's not client-supplied?
@@ -1519,8 +1518,8 @@ client
-- when (isJust ntf) $ incStat $ ntfCreated stats
case subMode of
SMOnlyCreate -> pure ()
SMSubscribe -> void $ subscribeNewQueue rcvId qr -- no need to check if message is available, it's a new queue
pure $ IDS QIK {rcvId, sndId, rcvPublicDhKey, queueMode, linkId = fst <$> queueData, serviceId = rcvServiceId} -- , serverNtfCreds = snd <$> ntf
SMSubscribe -> subscribeNewQueue rcvId qr -- no need to check if message is available, it's a new queue
pure $ IDS QIK {rcvId, sndId, rcvPublicDhKey, queueMode, linkId = fst <$> queueData, serviceId = clntServiceId} -- , serverNtfCreds = snd <$> ntf
(corrId,entId,) <$> tryCreate (3 :: Int)
-- this check allows to support contact queues created prior to SKEY,
@@ -1544,7 +1543,7 @@ client
getQueueLink_ q qr = liftIO $ LNK (senderId qr) <$$> getQueueLinkData (queueStore ms) q entId
addQueueNotifier_ :: StoreQueue s -> NtfPublicAuthKey -> RcvNtfPublicDhKey -> M s (Transmission BrokerMsg)
addQueueNotifier_ q notifierKey dhKey = time "NKEY" $ do
addQueueNotifier_ q notifierKey dhKey = do
(rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random
let rcvNtfDhSecret = C.dh' dhKey privDhKey
(corrId,entId,) <$> addNotifierRetry 3 rcvPublicDhKey rcvNtfDhSecret
@@ -1581,47 +1580,60 @@ client
suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q
-- TODO [certs rcv] if serviceId is passed, associate with the service and respond with SOK
subscribeQueue :: StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)
subscribeQueue q qr =
liftIO (TM.lookupIO rId subscriptions) >>= \case
Nothing -> subscribeNewQueue rId qr >>= deliver True
subscribeQueueAndDeliver :: StoreQueue s -> QueueRec -> M s ResponseAndMessage
subscribeQueueAndDeliver q qr =
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
Nothing -> subscribeRcvQueue qr >>= deliver True
Just s@Sub {subThread} -> do
stats <- asks serverStats
case subThread of
ProhibitSub -> do
-- cannot use SUB in the same connection where GET was used
incStat $ qSubProhibited stats
pure (corrId, rId, ERR $ CMD PROHIBITED)
pure (err (CMD PROHIBITED), Nothing)
_ -> do
incStat $ qSubDuplicate stats
atomically (tryTakeTMVar $ delivered s) >> deliver False s
atomically (writeTVar (delivered s) Nothing) >> deliver False s
where
rId = recipientId q
deliver :: Bool -> Sub -> M s (Transmission BrokerMsg)
deliver inc sub = do
deliver :: Bool -> Sub -> M s ResponseAndMessage
deliver hasSub sub = do
stats <- asks serverStats
fmap (either (\e -> (corrId, rId, ERR e)) id) $ liftIO $ runExceptT $ do
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
msg_ <- tryPeekMsg ms q
liftIO $ when (inc && isJust msg_) $ incStat (qSub stats)
liftIO $ deliverMessage "SUB" qr rId sub msg_
msg' <- forM msg_ $ \msg -> liftIO $ do
ts <- getSystemSeconds
atomically $ setDelivered sub msg ts
unless hasSub $ incStat $ qSub stats
pure (NoCorrId, entId, MSG (encryptMsg qr msg))
pure ((corrId, entId, SOK clntServiceId), msg')
subscribeNewQueue :: RecipientId -> QueueRec -> M s Sub
subscribeNewQueue rId QueueRec {rcvServiceId} = time "SUB newSub" . atomically $ do
writeTQueue (subQ subscribers) (CSClient rId rcvServiceId Nothing, clientId)
-- TODO [certs rcv] combine with subscribing ntf queues
subscribeRcvQueue :: QueueRec -> M s Sub
subscribeRcvQueue QueueRec {rcvServiceId} = atomically $ do
writeTQueue (subQ subscribers) (CSClient entId rcvServiceId Nothing, clientId)
sub <- newSubscription NoSub
TM.insert rId sub subscriptions
TM.insert entId sub $ subscriptions clnt
pure sub
subscribeNewQueue :: RecipientId -> QueueRec -> M s ()
subscribeNewQueue rId QueueRec {rcvServiceId} = do
case rcvServiceId of
Just _ -> atomically $ modifyTVar' (serviceSubsCount clnt) (+ 1)
Nothing -> do
sub <- atomically $ newSubscription NoSub
atomically $ TM.insert rId sub $ subscriptions clnt
atomically $ writeTQueue (subQ subscribers) (CSClient rId rcvServiceId rcvServiceId, clientId)
-- clients that use GET are not added to server subscribers
getMessage :: StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)
getMessage q qr = time "GET" $ do
atomically (TM.lookup entId subscriptions) >>= \case
getMessage q qr = do
atomically (TM.lookup entId $ subscriptions clnt) >>= \case
Nothing ->
atomically newSub >>= (`getMessage_` Nothing)
Just s@Sub {subThread} ->
case subThread of
ProhibitSub ->
atomically (tryTakeTMVar $ delivered s)
atomically (swapTVar (delivered s) Nothing)
>>= getMessage_ s
-- cannot use GET in the same connection where there is an active subscription
_ -> do
@@ -1632,7 +1644,7 @@ client
newSub :: STM Sub
newSub = do
s <- newProhibitedSub
TM.insert entId s subscriptions
TM.insert entId s $ subscriptions clnt
-- Here we don't account for this client as subscribed in the server
-- and don't notify other subscribed clients.
-- This is tracked as "subscription" in the client to prevent these
@@ -1650,27 +1662,32 @@ client
atomically $ setDelivered s msg ts $> (corrId, entId, MSG encMsg)
Nothing -> incStat (msgGetNoMsg stats) $> ok
withQueue :: (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)) -> M s (Transmission BrokerMsg)
withQueue = withQueue_ True
withQueue :: (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)) -> M s (Maybe ResponseAndMessage)
withQueue = fmap response . withQueue_ True err
{-# INLINE withQueue #-}
withQueue' :: (StoreQueue s -> QueueRec -> M s ResponseAndMessage) -> M s (Maybe ResponseAndMessage)
withQueue' = fmap Just . withQueue_ True ((,Nothing) . err)
{-# INLINE withQueue' #-}
-- SEND passes queueNotBlocked False here to update time, but it fails anyway on blocked queues (see code for SEND).
withQueue_ :: Bool -> (StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)) -> M s (Transmission BrokerMsg)
withQueue_ queueNotBlocked action = case q_ of
Nothing -> pure $ err INTERNAL
withQueue_ :: Bool -> (ErrorType -> r) -> (StoreQueue s -> QueueRec -> M s r) -> M s r
withQueue_ queueNotBlocked err' action = case q_ of
Nothing -> pure $ err' INTERNAL
Just (q, qr@QueueRec {status, updatedAt}) -> case status of
EntityBlocked info | queueNotBlocked -> pure $ err $ BLOCKED info
EntityBlocked info | queueNotBlocked -> pure $ err' $ BLOCKED info
_ -> do
t <- liftIO getSystemDate
if updatedAt == Just t
then action q qr
else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err) (action q)
else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q)
subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg
subscribeNotifications q NtfCreds {ntfServiceId} = do
stats <- asks serverStats
let incNtfSrvStat sel = incStat $ sel $ ntfServices stats
case service of
Just THClientService {serviceId}
case clntServiceId of
Just serviceId
| ntfServiceId == Just serviceId -> do
-- duplicate queue-service association - can only happen in case of response error/timeout
hasSub <- atomically $ ifM hasServiceSub (pure True) (False <$ newServiceQueueSub)
@@ -1719,25 +1736,25 @@ client
incStat $ if hasSub then ntfSubDuplicate stats else ntfSub stats
pure $ SOK Nothing
subscribeServiceNotifications :: THPeerClientService -> M s BrokerMsg
subscribeServiceNotifications THClientService {serviceId} = do
subscribeServiceNotifications :: ServiceId -> M s BrokerMsg
subscribeServiceNotifications serviceId = do
subscribed <- readTVarIO ntfServiceSubscribed
if subscribed
then SOKS <$> readTVarIO ntfServiceSubsCount
else
liftIO (getNtfServiceQueueCount @(StoreQueue s) (queueStore ms) serviceId) >>= \case
liftIO (getServiceQueueCount @(StoreQueue s) (queueStore ms) SNotifierService serviceId) >>= \case
Left e -> pure $ ERR e
Right !count' -> do
atomically $ do
incCount <- atomically $ do
writeTVar ntfServiceSubscribed True
count <- swapTVar ntfServiceSubsCount count'
modifyTVar' (totalServiceSubs ntfSubscribers) (+ (count' - count)) -- server count for all services
atomically $ writeTQueue (subQ ntfSubscribers) (CSService serviceId, clientId)
pure $ count' - count
atomically $ writeTQueue (subQ ntfSubscribers) (CSService serviceId incCount, clientId)
pure $ SOKS count'
acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)
acknowledgeMsg msgId q qr = time "ACK" $ do
liftIO (TM.lookupIO entId subscriptions) >>= \case
acknowledgeMsg msgId q qr =
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
Nothing -> pure $ err NO_MSG
Just sub ->
atomically (getDelivered sub) >>= \case
@@ -1751,16 +1768,20 @@ client
pure ok
_ -> do
(deletedMsg_, msg_) <- tryDelPeekMsg ms q msgId
liftIO $ mapM_ (updateStats stats False ts) deletedMsg_
liftIO $ deliverMessage "ACK" qr entId sub msg_
liftIO $ do
mapM_ (updateStats stats False ts) deletedMsg_
forM_ msg_ $ \msg -> do
ts <- getSystemSeconds
atomically $ setDelivered sub msg ts
pure (corrId, entId, maybe OK (MSG . encryptMsg qr) msg_)
_ -> pure $ err NO_MSG
where
getDelivered :: Sub -> STM (Maybe (ServerSub, RoundedSystemTime))
getDelivered Sub {delivered, subThread} = do
tryTakeTMVar delivered $>>= \v@(msgId', ts) ->
readTVar delivered $>>= \(msgId', ts) ->
if msgId == msgId' || B.null msgId
then pure $ Just (subThread, ts)
else putTMVar delivered v $> Nothing
then writeTVar delivered Nothing $> Just (subThread, ts)
else pure Nothing
updateStats :: ServerStats -> Bool -> RoundedSystemTime -> Message -> IO ()
updateStats stats isGet deliveryTime = \case
MessageQuota {} -> pure ()
@@ -1805,7 +1826,7 @@ client
deleteQueueLinkData (queueStore ms) q
ServerConfig {messageExpiration, msgIdBytes} <- asks config
msgId <- randomId' msgIdBytes
msg_ <- liftIO $ time "SEND" $ runExceptT $ do
msg_ <- liftIO $ runExceptT $ do
expireMessages messageExpiration stats
msg <- liftIO $ mkMessage msgId body
writeMsg ms q True msg
@@ -1814,7 +1835,7 @@ client
Right Nothing -> do
incStat $ msgSentQuota stats
pure $ err QUOTA
Right (Just (msg, wasEmpty)) -> time "SEND ok" $ do
Right (Just (msg, wasEmpty)) -> do
when wasEmpty $ liftIO $ tryDeliverMessage msg
when (notification msgFlags) $ do
mapM_ (`enqueueNotification` msg) (notifier qr)
@@ -1855,17 +1876,17 @@ client
deliverToSub rcv = do
ts <- getSystemSeconds
atomically $
-- reading client TVar in the same transaction,
-- so that if subscription ends, it re-evalutates
-- and delivery is cancelled -
-- the new client will receive message in response to SUB.
-- reading client TVar in the same transaction,
-- so that if subscription ends, it re-evalutates
-- and delivery is cancelled -
-- the new client will receive message in response to SUB.
readTVar rcv
$>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs
$>>= \s@Sub {subThread, delivered} -> case subThread of
ProhibitSub -> pure Nothing
ServerSub st -> readTVar st >>= \case
NoSub ->
tryTakeTMVar delivered >>= \case
readTVar delivered >>= \case
Just _ -> pure Nothing -- if a message was already delivered, should not deliver more
Nothing ->
ifM
@@ -1875,8 +1896,8 @@ client
_ -> pure Nothing
deliver sndQ' s ts = do
let encMsg = encryptMsg qr msg
writeTBQueue sndQ' [(CorrId "", rId, MSG encMsg)]
void $ setDelivered s msg ts
writeTBQueue sndQ' ([(NoCorrId, rId, MSG encMsg)], [])
setDelivered s msg ts
forkDeliver (rc@Client {sndQ = sndQ'}, s@Sub {delivered}, st) = do
t <- mkWeakThreadId =<< forkIO deliverThread
atomically $ modifyTVar' st $ \case
@@ -1889,10 +1910,10 @@ client
-- lookup can be outside of STM transaction,
-- as long as the check that it is the same client is inside.
getSubscribedClient rId (queueSubscribers subscribers) >>= mapM_ deliverIfSame
deliverIfSame rcv = time "deliver" $ do
deliverIfSame rcv = do
ts <- getSystemSeconds
atomically $ whenM (sameClient rc rcv) $
tryTakeTMVar delivered >>= \case
readTVar delivered >>= \case
Just _ -> pure () -- if a message was already delivered, should not deliver more
Nothing -> do
-- a separate thread is needed because it blocks when client sndQ is full.
@@ -1936,7 +1957,8 @@ client
Left r -> pure r
-- rejectOrVerify filters allowed commands, no need to repeat it here.
-- INTERNAL is used because processCommand never returns Nothing for sender commands (could be extracted for better types).
Right t''@(_, (corrId', entId', _)) -> fromMaybe (corrId', entId', ERR INTERNAL) <$> lift (processCommand Nothing fwdVersion t'')
-- `fst` removes empty message that is only returned for `SUB` command
Right t''@(_, (corrId', entId', _)) -> maybe (corrId', entId', ERR INTERNAL) fst <$> lift (processCommand Nothing fwdVersion t'')
-- encode response
r' <- case batchTransmissions clntTHParams [Right (Nothing, encodeTransmission clntTHParams r)] of
[] -> throwE INTERNAL -- at least 1 item is guaranteed from NonEmpty/Right
@@ -1969,22 +1991,6 @@ client
VRVerified q -> Right (q, t'')
VRFailed e -> Left (corrId', entId', ERR e)
deliverMessage :: T.Text -> QueueRec -> RecipientId -> Sub -> Maybe Message -> IO (Transmission BrokerMsg)
deliverMessage name qr rId s@Sub {subThread} msg_ = time (name <> " deliver") $
case subThread of
ProhibitSub -> pure resp
_ -> case msg_ of
Just msg -> do
ts <- getSystemSeconds
let encMsg = encryptMsg qr msg
atomically (setDelivered s msg ts) $> (corrId, rId, MSG encMsg)
_ -> pure resp
where
resp = (corrId, rId, OK)
time :: MonadIO m => T.Text -> m a -> m a
time name = timed name entId
encryptMsg :: QueueRec -> Message -> RcvMessage
encryptMsg qr msg = encrypt . encodeRcvMsgBody $ case msg of
Message {msgFlags, msgBody} -> RcvMsgBody {msgTs = msgTs', msgFlags, msgBody}
@@ -1995,21 +2001,21 @@ client
msgId' = messageId msg
msgTs' = messageTs msg
setDelivered :: Sub -> Message -> RoundedSystemTime -> STM Bool
setDelivered :: Sub -> Message -> RoundedSystemTime -> STM ()
setDelivered Sub {delivered} msg !ts = do
let !msgId = messageId msg
tryPutTMVar delivered (msgId, ts)
writeTVar delivered $ Just (msgId, ts)
delQueueAndMsgs :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
delQueueAndMsgs (q, QueueRec {rcvServiceId}) = do
liftIO (deleteQueue ms q) >>= \case
Right qr -> do
-- Possibly, the same should be done if the queue is suspended, but currently we do not use it
atomically $ do
writeTQueue (subQ subscribers) (CSDeleted entId rcvServiceId, clientId)
-- queue is usually deleted by the same client that is currently subscribed,
-- we delete subscription here, so the client with no subscriptions can be disconnected.
TM.delete entId subscriptions
-- queue is usually deleted by the same client that is currently subscribed,
-- we delete subscription here, so the client with no subscriptions can be disconnected.
sub <- atomically $ TM.lookupDelete entId $ subscriptions clnt
liftIO $ mapM_ cancelSub sub
atomically $ writeTQueue (subQ subscribers) (CSDeleted entId rcvServiceId, clientId)
forM_ (notifier qr) $ \NtfCreds {notifierId = nId, ntfServiceId} -> do
-- queue is deleted by a different client from the one subscribed to notifications,
-- so we don't need to remove subscription from the current client.
@@ -2024,7 +2030,7 @@ client
getQueueInfo :: StoreQueue s -> QueueRec -> M s BrokerMsg
getQueueInfo q QueueRec {senderKey, notifier} = do
fmap (either ERR INFO) $ liftIO $ runExceptT $ do
qiSub <- liftIO $ TM.lookupIO entId subscriptions >>= mapM mkQSub
qiSub <- liftIO $ TM.lookupIO entId (subscriptions clnt) >>= mapM mkQSub
qiSize <- getQueueSize ms q
qiMsg <- toMsgInfo <$$> tryPeekMsg ms q
let info = QueueInfo {qiSnd = isJust senderKey, qiNtf = isJust notifier, qiSub, qiSize, qiMsg}
@@ -2039,14 +2045,20 @@ client
SubPending -> QSubPending
SubThread _ -> QSubThread
ProhibitSub -> pure QProhibitSub
qDelivered <- atomically $ decodeLatin1 . encode . fst <$$> tryReadTMVar delivered
qDelivered <- decodeLatin1 . encode . fst <$$> readTVarIO delivered
pure QSub {qSubThread, qDelivered}
ok :: Transmission BrokerMsg
ok = (corrId, entId, OK)
{-# INLINE ok #-}
err :: ErrorType -> Transmission BrokerMsg
err e = (corrId, entId, ERR e)
{-# INLINE err #-}
response :: Transmission BrokerMsg -> Maybe ResponseAndMessage
response = Just . (,Nothing)
{-# INLINE response #-}
updateDeletedStats :: QueueRec -> M s ()
updateDeletedStats q = do
@@ -2060,18 +2072,6 @@ incStat :: MonadIO m => IORef Int -> m ()
incStat r = liftIO $ atomicModifyIORef'_ r (+ 1)
{-# INLINE incStat #-}
timed :: MonadIO m => T.Text -> RecipientId -> m a -> m a
timed name (EntityId qId) a = do
t <- liftIO getSystemTime
r <- a
t' <- liftIO getSystemTime
let int = diff t t'
when (int > sec) . logDebug $ T.unwords [name, tshow $ encode qId, tshow int]
pure r
where
diff t t' = (systemSeconds t' - systemSeconds t) * sec + fromIntegral (systemNanoseconds t' - systemNanoseconds t)
sec = 1000_000000
randomId' :: Int -> M s ByteString
randomId' n = atomically . C.randomBytes n =<< asks random

View File

@@ -40,6 +40,7 @@ module Simplex.Messaging.Server.Env.STM
MsgStore (..),
AStoreType (..),
VerifiedTransmission,
ResponseAndMessage,
newEnv,
mkJournalStoreConfig,
msgStore,
@@ -377,7 +378,7 @@ sameClient c cv = maybe False (sameClientId c) <$> readTVar cv
data ClientSub
= CSClient QueueId (Maybe ServiceId) (Maybe ServiceId) -- includes previous and new associated service IDs
| CSDeleted QueueId (Maybe ServiceId) -- includes previously associated service IDs
| CSService ServiceId -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
| CSService ServiceId Int64 -- only send END to idividual client subs on message delivery, not of SSUB/NSSUB
newtype ProxyAgent = ProxyAgent
{ smpAgent :: SMPClientAgent 'Sender
@@ -394,7 +395,7 @@ data Client s = Client
serviceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
ntfServiceSubsCount :: TVar Int64, -- only one service can be subscribed, based on its certificate, this is subscription count
rcvQ :: TBQueue (NonEmpty (VerifiedTransmission s)),
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
sndQ :: TBQueue (NonEmpty (Transmission BrokerMsg), [Transmission BrokerMsg]),
msgQ :: TBQueue (NonEmpty (Transmission BrokerMsg)),
procThreads :: TVar Int,
endThreads :: TVar (IntMap (Weak ThreadId)),
@@ -408,13 +409,15 @@ data Client s = Client
type VerifiedTransmission s = (Maybe (StoreQueue s, QueueRec), Transmission Cmd)
type ResponseAndMessage = (Transmission BrokerMsg, Maybe (Transmission BrokerMsg))
data ServerSub = ServerSub (TVar SubscriptionThread) | ProhibitSub
data SubscriptionThread = NoSub | SubPending | SubThread (Weak ThreadId)
data Sub = Sub
{ subThread :: ServerSub, -- Nothing value indicates that sub
delivered :: TMVar (MsgId, RoundedSystemTime)
delivered :: TVar (Maybe (MsgId, RoundedSystemTime))
}
newServer :: IO (Server s)
@@ -497,13 +500,13 @@ newClient clientId qSize clientTHParams createdAt = do
newSubscription :: SubscriptionThread -> STM Sub
newSubscription st = do
delivered <- newEmptyTMVar
delivered <- newTVar Nothing
subThread <- ServerSub <$> newTVar st
return Sub {subThread, delivered}
newProhibitedSub :: STM Sub
newProhibitedSub = do
delivered <- newEmptyTMVar
delivered <- newTVar Nothing
return Sub {subThread = ProhibitSub, delivered}
newEnv :: ServerConfig s -> IO (Env s)

View File

@@ -356,8 +356,8 @@ instance QueueStoreClass (JournalQueue s) (QStore s) where
{-# INLINE setQueueService #-}
getQueueNtfServices = withQS (getQueueNtfServices @(JournalQueue s))
{-# INLINE getQueueNtfServices #-}
getNtfServiceQueueCount = withQS (getNtfServiceQueueCount @(JournalQueue s))
{-# INLINE getNtfServiceQueueCount #-}
getServiceQueueCount = withQS (getServiceQueueCount @(JournalQueue s))
{-# INLINE getServiceQueueCount #-}
makeQueue_ :: JournalMsgStore s -> RecipientId -> QueueRec -> Lock -> IO (JournalQueue s)
makeQueue_ JournalMsgStore {sharedLock} rId qr queueLock = do

View File

@@ -43,14 +43,13 @@ import qualified Data.ByteString.Builder as BB
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Lazy as LB
import Data.Bitraversable (bimapM)
import Data.Either (fromRight, lefts, rights)
import Data.Either (fromRight, lefts)
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (foldl', intersperse, partition)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes, fromMaybe, mapMaybe)
import Data.Maybe (catMaybes, fromMaybe)
import qualified Data.Set as S
import Data.Text (Text)
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
@@ -65,7 +64,7 @@ import Database.PostgreSQL.Simple.ToField (Action (..), ToField (..))
import Database.PostgreSQL.Simple.Errors (ConstraintViolation (..), constraintViolation)
import Database.PostgreSQL.Simple.SqlQQ (sql)
import GHC.IO (catchAny)
import Simplex.Messaging.Agent.Client (withLockMap, withLocksMap)
import Simplex.Messaging.Agent.Client (withLockMap)
import Simplex.Messaging.Agent.Lock (Lock)
import Simplex.Messaging.Agent.Store.AgentStore ()
import Simplex.Messaging.Agent.Store.Postgres (createDBStore, closeDBStore)
@@ -84,7 +83,7 @@ import Simplex.Messaging.Server.StoreLog
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPServiceRole (..))
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>), ($>>=))
import Simplex.Messaging.Util (eitherToMaybe, firstRow, ifM, maybeFirstRow, tshow, (<$$>))
import System.Exit (exitFailure)
import System.IO (IOMode (..), hFlush, stdout)
import UnliftIO.STM
@@ -485,11 +484,15 @@ instance StoreQueueClass q => QueueStoreClass q (PostgresQueueStore q) where
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
in ((serviceId, sNtfs) : ssNtfs, restNtfs)
getNtfServiceQueueCount :: PostgresQueueStore q -> ServiceId -> IO (Either ErrorType Int64)
getNtfServiceQueueCount st serviceId =
E.uninterruptibleMask_ $ runExceptT $ withDB' "getNtfServiceQueueCount" st $ \db ->
getServiceQueueCount :: (PartyI p, ServiceParty p) => PostgresQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
getServiceQueueCount st party serviceId =
E.uninterruptibleMask_ $ runExceptT $ withDB' "getServiceQueueCount" st $ \db ->
fmap (fromMaybe 0) $ maybeFirstRow fromOnly $
DB.query db "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL" (Only serviceId)
DB.query db query (Only serviceId)
where
query = case party of
SRecipientService -> "SELECT count(1) FROM msg_queues WHERE rcv_service_id = ? AND deleted_at IS NULL"
SNotifierService -> "SELECT count(1) FROM msg_queues WHERE ntf_service_id = ? AND deleted_at IS NULL"
batchInsertServices :: [STMService] -> PostgresQueueStore q -> IO Int64
batchInsertServices services' toStore =

View File

@@ -346,10 +346,15 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
let (sNtfs, restNtfs) = partition (\(nId, _) -> S.member nId snIds) ntfs'
pure ((Just serviceId, sNtfs) : ssNtfs, restNtfs)
getNtfServiceQueueCount :: STMQueueStore q -> ServiceId -> IO (Either ErrorType Int64)
getNtfServiceQueueCount st serviceId =
getServiceQueueCount :: (PartyI p, ServiceParty p) => STMQueueStore q -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
getServiceQueueCount st party serviceId =
TM.lookupIO serviceId (services st) >>=
maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceNtfQueues)
maybe (pure $ Left AUTH) (fmap (Right . fromIntegral . S.size) . readTVarIO . serviceSel)
where
serviceSel :: STMService -> TVar (Set QueueId)
serviceSel = case party of
SRecipientService -> serviceRcvQueues
SNotifierService -> serviceNtfQueues
withQueueRec :: TVar (Maybe QueueRec) -> (QueueRec -> STM a) -> IO (Either ErrorType a)
withQueueRec qr a = atomically $ readQueueRec qr >>= mapM a

View File

@@ -48,7 +48,7 @@ class StoreQueueClass q => QueueStoreClass q s where
getCreateService :: s -> ServiceRec -> IO (Either ErrorType ServiceId)
setQueueService :: (PartyI p, ServiceParty p) => s -> q -> SParty p -> Maybe ServiceId -> IO (Either ErrorType ())
getQueueNtfServices :: s -> [(NotifierId, a)] -> IO (Either ErrorType ([(Maybe ServiceId, [(NotifierId, a)])], [(NotifierId, a)]))
getNtfServiceQueueCount :: s -> ServiceId -> IO (Either ErrorType Int64)
getServiceQueueCount :: (PartyI p, ServiceParty p) => s -> SParty p -> ServiceId -> IO (Either ErrorType Int64)
data EntityCounts = EntityCounts
{ queueCount :: Int,

View File

@@ -47,6 +47,7 @@ batchingTests = do
it "should batch with 135 subscriptions per batch" testClientBatchSubscriptions
it "should batch with 255 ENDs per batch" testClientBatchENDs
it "should batch with 80 NMSGs per batch" testClientBatchNMSGs
it "should batch subscription responses with message" testBatchSubResponses
it "should break on message that does not fit" testClientBatchWithMessage
it "should break on large message" testClientBatchWithLargeMessage
@@ -207,6 +208,20 @@ testClientBatchNMSGs = do
(length rs1, length rs2, length rs3) `shouldBe` (40, 80, 80)
all lenOk [s1, s2, s3] `shouldBe` True
-- 4 responses are used in Simplex.Messaging.Server / `send`
testBatchSubResponses :: IO ()
testBatchSubResponses = do
client <- testClientStub
soks <- replicateM 4 $ randomSOK
msg <- randomMSG
let msgs = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks <> [msg])
batches = batchTransmissions (thParams client) $ L.fromList msgs
length batches `shouldBe` 1
soks' <- replicateM 5 $ randomSOK
let msgs' = map (\t -> Right (Nothing, encodeTransmission (thParams client) t)) (soks' <> [msg])
batches' = batchTransmissions (thParams client) $ L.fromList msgs'
length batches' `shouldBe` 2
testClientBatchWithMessageV6 :: IO ()
testClientBatchWithMessageV6 = do
client <- testClientStubV6
@@ -361,6 +376,22 @@ randomNMSGCmd ts = do
Right encNMsgMeta <- pure $ C.cbEncrypt (C.dh' k pk) nonce (smpEncode msgMeta) 128
pure (CorrId "", EntityId nId, NMSG nonce encNMsgMeta)
randomSOK :: IO (Transmission BrokerMsg)
randomSOK = do
g <- C.newRandom
corrId <- atomically $ C.randomBytes 24 g
rId <- atomically $ C.randomBytes 24 g
pure (CorrId corrId, EntityId rId, SOK Nothing)
randomMSG :: IO (Transmission BrokerMsg)
randomMSG = do
g <- C.newRandom
corrId <- atomically $ C.randomBytes 24 g
rId <- atomically $ C.randomBytes 24 g
msgId <- atomically $ C.randomBytes 24 g
msg <- atomically $ C.randomBytes (maxMessageLength currentClientSMPRelayVersion) g
pure (CorrId corrId, EntityId rId, MSG RcvMessage {msgId, msgBody = EncRcvMsgBody msg})
randomSENDv6 :: ByteString -> Int -> IO (Either TransportError (Maybe TAuthorizations, ByteString))
randomSENDv6 = randomSEND_ C.SEd25519 minServerSMPRelayVersion

View File

@@ -18,6 +18,7 @@
module ServerTests where
import Control.Concurrent (ThreadId, killThread, threadDelay)
import Control.Concurrent.Async (concurrently_)
import Control.Concurrent.STM
import Control.Exception (SomeException, throwIO, try)
import Control.Monad
@@ -29,6 +30,7 @@ import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Hashable (hash)
import qualified Data.IntSet as IS
import Data.List.NonEmpty (NonEmpty)
import Data.String (IsString (..))
import Data.Type.Equality
import qualified Data.X509.Validation as XV
@@ -75,6 +77,7 @@ serverTests = do
describe "GET command" testGetCommand
describe "GET & SUB commands" testGetSubCommands
describe "Exceeding queue quota" testExceedQueueQuota
describe "Concurrent sending and delivery" testConcurrentSendDelivery
describe "Store log" testWithStoreLog
describe "Restore messages" testRestoreMessages
describe "Restore messages (old / v2)" testRestoreExpireMessages
@@ -111,16 +114,25 @@ sendRecv h@THandle {params} (sgn, corrId, qId, cmd) = do
tGet1 h
signSendRecv :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg))
signSendRecv h pk = signSendRecv_ h pk Nothing
signSendRecv h pk t = do
[r] <- signSendRecv_ h pk Nothing t
pure r
signSendRecv2 :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg), Transmission (Either ErrorType BrokerMsg))
signSendRecv2 h pk t = do
[r1, r2] <- signSendRecv_ h pk Nothing t
pure (r1, r2)
serviceSignSendRecv :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg))
serviceSignSendRecv h pk = signSendRecv_ h pk . Just
serviceSignSendRecv h pk serviceKey t = do
[r] <- signSendRecv_ h pk (Just serviceKey) t
pure r
signSendRecv_ :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> Maybe C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (Transmission (Either ErrorType BrokerMsg))
signSendRecv_ :: forall c p. (Transport c, PartyI p) => THandleSMP c 'TClient -> C.APrivateAuthKey -> Maybe C.PrivateKeyEd25519 -> (ByteString, EntityId, Command p) -> IO (NonEmpty (Transmission (Either ErrorType BrokerMsg)))
signSendRecv_ h@THandle {params} (C.APrivateAuthKey a pk) serviceKey_ (corrId, qId, cmd) = do
let TransmissionForAuth {tForAuth, tToSend} = encodeTransmissionForAuth params (CorrId corrId, qId, cmd)
Right () <- tPut1 h (authorize tForAuth, tToSend)
tGet1 h
liftIO $ tGetClient h
where
authorize t = (,(`C.sign'` t) <$> serviceKey_) <$> case a of
C.SEd25519 -> Just . TASignature . C.ASignature C.SEd25519 $ C.sign' pk t'
@@ -365,7 +377,7 @@ testCreateDelete =
Resp "bcda" _ ok4 <- signSendRecv rh rKey ("bcda", rId, OFF)
(ok4, OK) #== "accepts OFF when suspended"
Resp "cdab" _ (Msg mId2 msg2) <- signSendRecv rh rKey ("cdab", rId, SUB)
(Resp "cdab" _ (SOK Nothing), Resp "" _ (Msg mId2 msg2)) <- signSendRecv2 rh rKey ("cdab", rId, SUB)
(dec mId2 msg2, Right "hello") #== "accepts SUB when suspended and delivers the message again (because was not ACKed)"
Resp "dabc" _ err5 <- sendRecv rh (sampleSig, "dabc", rId, DEL)
@@ -404,7 +416,7 @@ stressTest =
Resp "" NoEntity (Ids rId _ _) <- signSendRecv h1 rKey ("", NoEntity, New rPub dhPub)
pure rId
let subscribeQueues h = forM_ rIds $ \rId -> do
Resp "" rId' OK <- signSendRecv h rKey ("", rId, SUB)
Resp "" rId' (SOK Nothing) <- signSendRecv h rKey ("", rId, SUB)
rId' `shouldBe` rId
closeConnection $ connection h1
subscribeQueues h2
@@ -497,7 +509,7 @@ testSwitchSub =
Resp "abcd" _ (Msg mId2 msg2) <- signSendRecv rh1 rKey ("abcd", rId, ACK mId1)
(dec mId2 msg2, Right "test2, no ACK") #== "test message 2 delivered, no ACK"
Resp "bcda" _ (Msg mId2' msg2') <- signSendRecv rh2 rKey ("bcda", rId, SUB)
(Resp "bcda" _ (SOK Nothing), Resp "" _ (Msg mId2' msg2')) <- signSendRecv2 rh2 rKey ("bcda", rId, SUB)
(dec mId2' msg2', Right "test2, no ACK") #== "same simplex queue via another TCP connection, tes2 delivered again (no ACK in 1st queue)"
Resp "cdab" _ OK <- signSendRecv rh2 rKey ("cdab", rId, ACK mId2')
@@ -620,6 +632,27 @@ testExceedQueueQuota =
Resp "10" _ OK <- signSendRecv rh rKey ("10", rId, ACK mId4)
pure ()
testConcurrentSendDelivery :: SpecWith (ASrvTransport, AStoreType)
testConcurrentSendDelivery =
it "should continue delivering messages if message is sent before it is acknowledged" $ \(ATransport t, msType) -> do
g <- C.newRandom
smpTest3 t msType $ \rh sh1 sh2 -> do
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
(sId, rId, rKey, dhShared) <- createAndSecureQueue rh sPub
let dec = decryptMsgV3 dhShared
sndMsg sh n = do
Resp (CorrId n') _ OK <- signSendRecv sh sKey (n, sId, _SEND ("msg " <> n))
n' `shouldBe` n
isMsg1or2 mId msg = dec mId msg == Right "msg 1" || dec mId msg == Right "msg 2" `shouldBe` True
replicateM_ 50 $ do
concurrently_ (sndMsg sh1 "1") (sndMsg sh2 "2")
Resp "" _ (Msg mId1 msg1) <- tGet1 rh
isMsg1or2 mId1 msg1
Resp "3" _ (Msg mId2 msg2) <- signSendRecv rh rKey ("3", rId, ACK mId1)
isMsg1or2 mId2 msg2
Resp "4" _ OK <- signSendRecv rh rKey ("4", rId, ACK mId2)
pure ()
testWithStoreLog :: SpecWith (ASrvTransport, AStoreType)
testWithStoreLog =
it "should store simplex queues to log and restore them after server restart" $ \(at@(ATransport t), msType) -> do
@@ -684,7 +717,7 @@ testWithStoreLog =
nId <- readTVarIO notifierId
Resp "dabc" _ (SOK Nothing) <- signSendRecv h1 nKey ("dabc", nId, NSUB)
Resp "bcda" _ OK <- signSendRecv h sKey1 ("bcda", sId1, _SEND' "hello")
Resp "cdab" _ (Msg mId3 msg3) <- signSendRecv h rKey1 ("cdab", rId1, SUB)
(Resp "cdab" _ (SOK Nothing), Resp "" _ (Msg mId3 msg3)) <- signSendRecv2 h rKey1 ("cdab", rId1, SUB)
(decryptMsgV3 dh1 mId3 msg3, Right "hello") #== "delivered from restored queue"
Resp "" _ (NMSG _ _) <- tGet1 h1
-- this queue is removed - not restored
@@ -769,7 +802,7 @@ testRestoreMessages =
Just rKey <- readTVarIO recipientKey
Just dh <- readTVarIO dhShared
let dec = decryptMsgV3 dh
Resp "2" _ (Msg mId2 msg2) <- signSendRecv h rKey ("2", rId, SUB)
(Resp "2" _ (SOK Nothing), Resp "" _ (Msg mId2 msg2)) <- signSendRecv2 h rKey ("2", rId, SUB)
(dec mId2 msg2, Right "hello 2") #== "restored message delivered"
Resp "3" _ (Msg mId3 msg3) <- signSendRecv h rKey ("3", rId, ACK mId2)
(dec mId3 msg3, Right "hello 3") #== "restored message delivered"
@@ -786,7 +819,7 @@ testRestoreMessages =
Just rKey <- readTVarIO recipientKey
Just dh <- readTVarIO dhShared
let dec = decryptMsgV3 dh
Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, SUB)
(Resp "4" _ (SOK Nothing), Resp "" _ (Msg mId4 msg4)) <- signSendRecv2 h rKey ("4", rId, SUB)
(dec mId4 msg4, Right "hello 4") #== "restored message delivered"
Resp "5" _ (Msg mId5 msg5) <- signSendRecv h rKey ("5", rId, ACK mId4)
(dec mId5 msg5, Right "hello 5") #== "restored message delivered"
@@ -1131,7 +1164,7 @@ testMsgExpireOnSend =
threadDelay 2500000
Resp "2" _ OK <- signSendRecv sh sKey ("2", sId, _SEND "hello (should NOT expire)")
testSMPClient @c $ \rh -> do
Resp "3" _ (Msg mId msg) <- signSendRecv rh rKey ("3", rId, SUB)
(Resp "3" _ (SOK Nothing), Resp "" _ (Msg mId msg)) <- signSendRecv2 rh rKey ("3", rId, SUB)
(dec mId msg, Right "hello (should NOT expire)") #== "delivered"
1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case
Nothing -> return ()
@@ -1139,8 +1172,7 @@ testMsgExpireOnSend =
testMsgExpireOnInterval :: SpecWith (ASrvTransport, AStoreType)
testMsgExpireOnInterval =
-- fails on ubuntu
xit' "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c 'TServer), msType) -> do
it "should expire messages that are not received before messageTTL after expiry interval" $ \(ATransport (t :: TProxy c 'TServer), msType) -> do
g <- C.newRandom
(sPub, sKey) <- atomically $ C.generateAuthKeyPair C.SEd25519 g
let cfg' = updateCfg (cfgMS msType) $ \cfg_ -> cfg_ {messageExpiration = Just ExpirationConfig {ttl = 1, checkInterval = 1}, idleQueueInterval = 1}
@@ -1151,7 +1183,7 @@ testMsgExpireOnInterval =
threadDelay 3000000
testSMPClient @c $ \rh -> do
signSendRecv rh rKey ("2", rId, SUB) >>= \case
Resp "2" _ OK -> pure ()
Resp "2" _ (SOK Nothing) -> pure ()
r -> unexpected r
1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case
Nothing -> return ()
@@ -1170,7 +1202,7 @@ testMsgNOTExpireOnInterval =
Resp "1" _ OK <- signSendRecv sh sKey ("1", sId, _SEND "hello (should NOT expire)")
threadDelay 2500000
testSMPClient @c $ \rh -> do
Resp "2" _ (Msg mId msg) <- signSendRecv rh rKey ("2", rId, SUB)
(Resp "2" _ (SOK Nothing), Resp "" _ (Msg mId msg)) <- signSendRecv2 rh rKey ("2", rId, SUB)
(dec mId msg, Right "hello (should NOT expire)") #== "delivered"
1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg rh >>= \case
Nothing -> return ()