mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-26 13:07:25 +00:00
smp server: count queued and sent END events
This commit is contained in:
@@ -170,9 +170,10 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
serverThread s label subQ subs clientSubs unsub = do
|
||||
labelMyThread label
|
||||
cls <- asks clients
|
||||
stats <- asks serverStats
|
||||
forever $
|
||||
atomically (updateSubscribers cls)
|
||||
$>>= endPreviousSubscriptions
|
||||
$>>= endPreviousSubscriptions stats
|
||||
>>= liftIO . mapM_ unsub
|
||||
where
|
||||
updateSubscribers :: TVar (IM.IntMap (Maybe Client)) -> STM (Maybe (QueueId, Client))
|
||||
@@ -189,10 +190,11 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
yes <- readTVar $ connected c'
|
||||
pure $ if yes then Just (qId, c') else Nothing
|
||||
updateSub qId (subs s) $>>= clientToBeNotified
|
||||
endPreviousSubscriptions :: (QueueId, Client) -> M (Maybe s)
|
||||
endPreviousSubscriptions (qId, c) = do
|
||||
forkClient c (label <> ".endPreviousSubscriptions") $
|
||||
endPreviousSubscriptions :: ServerStats -> (QueueId, Client) -> M (Maybe s)
|
||||
endPreviousSubscriptions stats (qId, c) = do
|
||||
forkClient c (label <> ".endPreviousSubscriptions") $ do
|
||||
atomically $ writeTBQueue (sndQ c) [(CorrId "", qId, END)]
|
||||
incStat $ qSubEnd stats
|
||||
atomically $ TM.lookupDelete qId (clientSubs c)
|
||||
|
||||
receiveFromProxyAgent :: ProxyAgent -> M ()
|
||||
@@ -442,6 +444,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do
|
||||
putStat "qSubNoMsg" qSubNoMsg
|
||||
subs <- (,,) <$> getStat qSubAuth <*> getStat qSubDuplicate <*> getStat qSubProhibited
|
||||
hPutStrLn h $ "other SUB events (auth, duplicate, prohibited): " <> show subs
|
||||
putStat "qSubEnd" qSubEnd
|
||||
putStat "qSubEndSent" qSubEndSent
|
||||
putStat "msgSent" msgSent
|
||||
putStat "msgRecv" msgRecv
|
||||
putStat "msgRecvGet" msgRecvGet
|
||||
@@ -631,9 +635,10 @@ runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessio
|
||||
atomically $ modifyTVar' active $ IM.insert clientId $ Just c
|
||||
s <- asks server
|
||||
expCfg <- asks $ inactiveClientExpiration . config
|
||||
stats <- asks serverStats
|
||||
th <- newMVar h -- put TH under a fair lock to interleave messages and command responses
|
||||
labelMyThread . B.unpack $ "client $" <> encode sessionId
|
||||
raceAny_ $ [liftIO $ send th c, liftIO $ sendMsg th c, client thParams c s, receive h c] <> disconnectThread_ c expCfg
|
||||
raceAny_ $ [liftIO $ send th c stats, liftIO $ sendMsg th c, client thParams c s, receive h c] <> disconnectThread_ c expCfg
|
||||
disconnectThread_ c (Just expCfg) = [liftIO $ disconnectTransport h (rcvActiveAt c) (sndActiveAt c) expCfg (noSubscriptions c)]
|
||||
disconnectThread_ _ _ = []
|
||||
noSubscriptions c = atomically $ (&&) <$> TM.null (ntfSubscriptions c) <*> (not . hasSubs <$> readTVar (subscriptions c))
|
||||
@@ -701,8 +706,8 @@ receive h@THandle {params = THandleParams {thAuth}} Client {rcvQ, sndQ, rcvActiv
|
||||
pure $ Left (corrId, entId, ERR AUTH)
|
||||
write q = mapM_ (atomically . writeTBQueue q) . L.nonEmpty
|
||||
|
||||
send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> IO ()
|
||||
send th c@Client {sndQ, msgQ, sessionId} = do
|
||||
send :: Transport c => MVar (THandleSMP c 'TServer) -> Client -> ServerStats -> IO ()
|
||||
send th c@Client {sndQ, msgQ, sessionId} stats = do
|
||||
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " send"
|
||||
forever $ atomically (readTBQueue sndQ) >>= sendTransmissions
|
||||
where
|
||||
@@ -715,6 +720,9 @@ send th c@Client {sndQ, msgQ, sessionId} = do
|
||||
-- this will reply OK to all SUBs in the first batched transmission,
|
||||
-- to reduce client timeouts.
|
||||
tSend th c ts'
|
||||
case ts' of
|
||||
[(_, _, END)] -> incStat $ qSubEndSent stats
|
||||
_ -> pure ()
|
||||
-- After that all messages will be sent in separate transmissions,
|
||||
-- without any client response timeouts, and allowing them to interleave
|
||||
-- with other requests responses.
|
||||
|
||||
@@ -31,6 +31,8 @@ data ServerStats = ServerStats
|
||||
qSubAuth :: TVar Int,
|
||||
qSubDuplicate :: TVar Int,
|
||||
qSubProhibited :: TVar Int,
|
||||
qSubEnd :: TVar Int,
|
||||
qSubEndSent :: TVar Int,
|
||||
ntfCreated :: TVar Int,
|
||||
ntfDeleted :: TVar Int,
|
||||
ntfSub :: TVar Int,
|
||||
@@ -77,6 +79,8 @@ data ServerStatsData = ServerStatsData
|
||||
_qSubAuth :: Int,
|
||||
_qSubDuplicate :: Int,
|
||||
_qSubProhibited :: Int,
|
||||
_qSubEnd :: Int,
|
||||
_qSubEndSent :: Int,
|
||||
_ntfCreated :: Int,
|
||||
_ntfDeleted :: Int,
|
||||
_ntfSub :: Int,
|
||||
@@ -125,6 +129,8 @@ newServerStats ts = do
|
||||
qSubAuth <- newTVarIO 0
|
||||
qSubDuplicate <- newTVarIO 0
|
||||
qSubProhibited <- newTVarIO 0
|
||||
qSubEnd <- newTVarIO 0
|
||||
qSubEndSent <- newTVarIO 0
|
||||
ntfCreated <- newTVarIO 0
|
||||
ntfDeleted <- newTVarIO 0
|
||||
ntfSub <- newTVarIO 0
|
||||
@@ -170,6 +176,8 @@ newServerStats ts = do
|
||||
qSubAuth,
|
||||
qSubDuplicate,
|
||||
qSubProhibited,
|
||||
qSubEnd,
|
||||
qSubEndSent,
|
||||
ntfCreated,
|
||||
ntfDeleted,
|
||||
ntfSub,
|
||||
@@ -217,6 +225,8 @@ getServerStatsData s = do
|
||||
_qSubAuth <- readTVarIO $ qSubAuth s
|
||||
_qSubDuplicate <- readTVarIO $ qSubDuplicate s
|
||||
_qSubProhibited <- readTVarIO $ qSubProhibited s
|
||||
_qSubEnd <- readTVarIO $ qSubEnd s
|
||||
_qSubEndSent <- readTVarIO $ qSubEndSent s
|
||||
_ntfCreated <- readTVarIO $ ntfCreated s
|
||||
_ntfDeleted <- readTVarIO $ ntfDeleted s
|
||||
_ntfSub <- readTVarIO $ ntfSub s
|
||||
@@ -262,6 +272,8 @@ getServerStatsData s = do
|
||||
_qSubAuth,
|
||||
_qSubDuplicate,
|
||||
_qSubProhibited,
|
||||
_qSubEnd,
|
||||
_qSubEndSent,
|
||||
_ntfCreated,
|
||||
_ntfDeleted,
|
||||
_ntfSub,
|
||||
@@ -309,6 +321,8 @@ setServerStats s d = do
|
||||
writeTVar (qSubAuth s) $! _qSubAuth d
|
||||
writeTVar (qSubDuplicate s) $! _qSubDuplicate d
|
||||
writeTVar (qSubProhibited s) $! _qSubProhibited d
|
||||
writeTVar (qSubEnd s) $! _qSubEnd d
|
||||
writeTVar (qSubEndSent s) $! _qSubEndSent d
|
||||
writeTVar (ntfCreated s) $! _ntfCreated d
|
||||
writeTVar (ntfDeleted s) $! _ntfDeleted d
|
||||
writeTVar (ntfSub s) $! _ntfSub d
|
||||
@@ -408,6 +422,8 @@ instance StrEncoding ServerStatsData where
|
||||
_qSubAuth <- opt "qSubAuth="
|
||||
_qSubDuplicate <- opt "qSubDuplicate="
|
||||
_qSubProhibited <- opt "qSubProhibited="
|
||||
_qSubEnd <- pure 0
|
||||
_qSubEndSent <- pure 0
|
||||
_ntfCreated <- opt "ntfCreated="
|
||||
_ntfDeleted <- opt "ntfDeleted="
|
||||
_ntfSub <- opt "ntfSub="
|
||||
@@ -464,6 +480,8 @@ instance StrEncoding ServerStatsData where
|
||||
_qSubAuth,
|
||||
_qSubDuplicate,
|
||||
_qSubProhibited,
|
||||
_qSubEnd,
|
||||
_qSubEndSent,
|
||||
_ntfCreated,
|
||||
_ntfDeleted,
|
||||
_ntfSub,
|
||||
|
||||
Reference in New Issue
Block a user