Merge branch 'master' into users

This commit is contained in:
Evgeny Poberezkin
2023-01-14 15:34:18 +00:00
7 changed files with 158 additions and 71 deletions
+1 -1
View File
@@ -1,5 +1,5 @@
name: simplexmq
version: 4.2.2
version: 4.2.3
synopsis: SimpleXMQ message broker
description: |
This package includes <./docs/Simplex-Messaging-Server.html server>,
+1 -1
View File
@@ -5,7 +5,7 @@ cabal-version: 1.12
-- see: https://github.com/sol/hpack
name: simplexmq
version: 4.2.2
version: 4.2.3
synopsis: SimpleXMQ message broker
description: This package includes <./docs/Simplex-Messaging-Server.html server>,
<./docs/Simplex-Messaging-Client.html client> and
+56 -16
View File
@@ -104,8 +104,8 @@ type M a = ReaderT Env IO a
smpServer :: TMVar Bool -> ServerConfig -> M ()
smpServer started cfg@ServerConfig {transports, logTLSErrors} = do
s <- asks server
restoreServerStats
restoreServerMessages
restoreServerStats
raceAny_
( serverThread s subscribedQ subscribers subscriptions cancelSub :
serverThread s ntfSubscribedQ notifiers ntfSubscriptions (\_ -> pure ()) :
@@ -174,7 +174,7 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues} <- asks serverStats
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats
let interval = 1000000 * logInterval
withFile statsFilePath AppendMode $ \h -> liftIO $ do
hSetBuffering h LineBuffering
@@ -187,7 +187,31 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do
msgSent' <- atomically $ swapTVar msgSent 0
msgRecv' <- atomically $ swapTVar msgRecv 0
ps <- atomically $ periodStatCounts activeQueues ts
hPutStrLn h $ intercalate "," [iso8601Show $ utctDay fromTime', show qCreated', show qSecured', show qDeleted', show msgSent', show msgRecv', dayCount ps, weekCount ps, monthCount ps]
msgSentNtf' <- atomically $ swapTVar msgSentNtf 0
msgRecvNtf' <- atomically $ swapTVar msgRecvNtf 0
psNtf <- atomically $ periodStatCounts activeQueuesNtf ts
qCount' <- readTVarIO qCount
msgCount' <- readTVarIO msgCount
hPutStrLn h $
intercalate
","
[ iso8601Show $ utctDay fromTime',
show qCreated',
show qSecured',
show qDeleted',
show msgSent',
show msgRecv',
dayCount ps,
weekCount ps,
monthCount ps,
show msgSentNtf',
show msgRecvNtf',
dayCount psNtf,
weekCount psNtf,
monthCount psNtf,
show qCount',
show msgCount'
]
threadDelay interval
runClient :: Transport c => TProxy c -> c -> M ()
@@ -387,6 +411,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
withLog (`logCreateById` rId)
stats <- asks serverStats
atomically $ modifyTVar' (qCreated stats) (+ 1)
atomically $ modifyTVar' (qCount stats) (+ 1)
subscribeQueue qr rId $> IDS (qik ids)
logCreateById :: StoreLog 'WriteMode -> RecipientId -> IO ()
@@ -509,12 +534,12 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
q <- getStoreMsgQueue "ACK" queueId
case s of
Sub {subThread = ProhibitSub} -> do
msgDeleted <- atomically $ tryDelMsg q msgId
when msgDeleted updateStats
deletedMsg_ <- atomically $ tryDelMsg q msgId
mapM_ updateStats deletedMsg_
pure ok
_ -> do
(msgDeleted, msg_) <- atomically $ tryDelPeekMsg q msgId
when msgDeleted updateStats
(deletedMsg_, msg_) <- atomically $ tryDelPeekMsg q msgId
mapM_ updateStats deletedMsg_
deliverMessage "ACK" qr queueId sub q msg_
_ -> pure $ err NO_MSG
where
@@ -525,11 +550,17 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
if msgId == msgId' || B.null msgId
then pure $ Just s
else putTMVar delivered msgId' $> Nothing
updateStats :: m ()
updateStats = do
stats <- asks serverStats
atomically $ modifyTVar' (msgRecv stats) (+ 1)
atomically $ updatePeriodStats (activeQueues stats) queueId
updateStats :: Message -> m ()
updateStats = \case
MessageQuota {} -> pure ()
Message {msgFlags} -> do
stats <- asks serverStats
atomically $ modifyTVar' (msgRecv stats) (+ 1)
atomically $ modifyTVar' (msgCount stats) (+ 1)
atomically $ updatePeriodStats (activeQueues stats) queueId
when (notification msgFlags) $ do
atomically $ modifyTVar' (msgRecvNtf stats) (+ 1)
atomically $ updatePeriodStats (activeQueuesNtf stats) queueId
sendMessage :: QueueRec -> MsgFlags -> MsgBody -> m (Transmission BrokerMsg)
sendMessage qr msgFlags msgBody
@@ -547,10 +578,13 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
case msg_ of
Nothing -> pure $ err QUOTA
Just msg -> time "SEND ok" $ do
when (notification msgFlags) $
atomically . trySendNotification msg =<< asks idsDrg
stats <- asks serverStats
when (notification msgFlags) $ do
atomically . trySendNotification msg =<< asks idsDrg
atomically $ modifyTVar' (msgSentNtf stats) (+ 1)
atomically $ updatePeriodStats (activeQueuesNtf stats) (recipientId qr)
atomically $ modifyTVar' (msgSent stats) (+ 1)
atomically $ modifyTVar' (msgCount stats) (subtract 1)
atomically $ updatePeriodStats (activeQueues stats) (recipientId qr)
pure ok
where
@@ -647,6 +681,7 @@ client clnt@Client {thVersion, subscriptions, ntfSubscriptions, rcvQ, sndQ} Serv
ms <- asks msgStore
stats <- asks serverStats
atomically $ modifyTVar' (qDeleted stats) (+ 1)
atomically $ modifyTVar' (qCount stats) (subtract 1)
atomically $
deleteQueue st queueId >>= \case
Left e -> pure $ err e
@@ -726,7 +761,10 @@ restoreServerMessages = asks (storeMsgsFile . config) >>= mapM_ restoreMessages
full <- atomically $ do
q <- getMsgQueue ms rId quota
isNothing <$> writeMsg q msg
when full . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message))
case msg of
Message {} ->
when full . logError . decodeLatin1 $ "message queue " <> strEncode rId <> " is full, message not restored: " <> strEncode (msgId (msg :: Message))
MessageQuota {} -> pure ()
updateMsgV1toV3 QueueRec {rcvDhSecret} RcvMessage {msgId, msgTs, msgFlags, msgBody = EncRcvMsgBody body} = do
let nonce = C.cbNonce msgId
msgBody <- liftEither . first (msgErr "v1 message decryption") $ C.maxLenBS =<< C.cbDecrypt rcvDhSecret nonce body
@@ -752,7 +790,9 @@ restoreServerStats = asks (serverStatsBackupFile . config) >>= mapM_ restoreStat
liftIO (strDecode <$> B.readFile f) >>= \case
Right d -> do
s <- asks serverStats
atomically $ setServerStats s d
_qCount <- fmap (length . M.keys) . readTVarIO . queues =<< asks queueStore
_msgCount <- foldM (\n q -> (n +) <$> readTVarIO (size q)) 0 =<< readTVarIO =<< asks msgStore
atomically $ setServerStats s d {_qCount, _msgCount}
renameFile f $ f <> ".bak"
logInfo "server stats restored"
Left e -> do
+10 -10
View File
@@ -8,7 +8,7 @@
module Simplex.Messaging.Server.MsgStore.STM
( STMMsgStore,
MsgQueue,
MsgQueue (..),
newMsgStore,
getMsgQueue,
delMsgQueue,
@@ -86,22 +86,22 @@ peekMsg :: MsgQueue -> STM Message
peekMsg = peekTQueue . msgQueue
{-# INLINE peekMsg #-}
tryDelMsg :: MsgQueue -> MsgId -> STM Bool
tryDelMsg :: MsgQueue -> MsgId -> STM (Maybe Message)
tryDelMsg mq msgId' =
tryPeekMsg mq >>= \case
Just msg
| msgId msg == msgId' || B.null msgId' -> tryDeleteMsg mq >> pure True
| otherwise -> pure False
_ -> pure False
msg_@(Just msg)
| msgId msg == msgId' || B.null msgId' -> tryDeleteMsg mq >> pure msg_
| otherwise -> pure Nothing
_ -> pure Nothing
-- atomic delete (== read) last and peek next message if available
tryDelPeekMsg :: MsgQueue -> MsgId -> STM (Bool, Maybe Message)
tryDelPeekMsg :: MsgQueue -> MsgId -> STM (Maybe Message, Maybe Message)
tryDelPeekMsg mq msgId' =
tryPeekMsg mq >>= \case
msg_@(Just msg)
| msgId msg == msgId' || B.null msgId' -> (True,) <$> (tryDeleteMsg mq >> tryPeekMsg mq)
| otherwise -> pure (False, msg_)
_ -> pure (False, Nothing)
| msgId msg == msgId' || B.null msgId' -> (msg_,) <$> (tryDeleteMsg mq >> tryPeekMsg mq)
| otherwise -> pure (Nothing, msg_)
_ -> pure (Nothing, Nothing)
deleteExpiredMsgs :: MsgQueue -> Int64 -> STM ()
deleteExpiredMsgs mq old = loop
+56 -17
View File
@@ -1,3 +1,4 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
@@ -5,7 +6,7 @@
module Simplex.Messaging.Server.Stats where
import Control.Applicative (optional)
import Control.Applicative (optional, (<|>))
import qualified Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Char8 as B
import Data.Set (Set)
@@ -24,7 +25,12 @@ data ServerStats = ServerStats
qDeleted :: TVar Int,
msgSent :: TVar Int,
msgRecv :: TVar Int,
activeQueues :: PeriodStats RecipientId
activeQueues :: PeriodStats RecipientId,
msgSentNtf :: TVar Int,
msgRecvNtf :: TVar Int,
activeQueuesNtf :: PeriodStats RecipientId,
qCount :: TVar Int,
msgCount :: TVar Int
}
data ServerStatsData = ServerStatsData
@@ -34,7 +40,12 @@ data ServerStatsData = ServerStatsData
_qDeleted :: Int,
_msgSent :: Int,
_msgRecv :: Int,
_activeQueues :: PeriodStatsData RecipientId
_activeQueues :: PeriodStatsData RecipientId,
_msgSentNtf :: Int,
_msgRecvNtf :: Int,
_activeQueuesNtf :: PeriodStatsData RecipientId,
_qCount :: Int,
_msgCount :: Int
}
newServerStats :: UTCTime -> STM ServerStats
@@ -46,7 +57,12 @@ newServerStats ts = do
msgSent <- newTVar 0
msgRecv <- newTVar 0
activeQueues <- newPeriodStats
pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues}
msgSentNtf <- newTVar 0
msgRecvNtf <- newTVar 0
activeQueuesNtf <- newPeriodStats
qCount <- newTVar 0
msgCount <- newTVar 0
pure ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount}
getServerStatsData :: ServerStats -> STM ServerStatsData
getServerStatsData s = do
@@ -57,7 +73,12 @@ getServerStatsData s = do
_msgSent <- readTVar $ msgSent s
_msgRecv <- readTVar $ msgRecv s
_activeQueues <- getPeriodStatsData $ activeQueues s
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues}
_msgSentNtf <- readTVar $ msgSentNtf s
_msgRecvNtf <- readTVar $ msgRecvNtf s
_activeQueuesNtf <- getPeriodStatsData $ activeQueuesNtf s
_qCount <- readTVar $ qCount s
_msgCount <- readTVar $ msgCount s
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues, _msgSentNtf, _msgRecvNtf, _activeQueuesNtf, _qCount, _msgCount}
setServerStats :: ServerStats -> ServerStatsData -> STM ()
setServerStats s d = do
@@ -67,10 +88,15 @@ setServerStats s d = do
writeTVar (qDeleted s) $! _qDeleted d
writeTVar (msgSent s) $! _msgSent d
writeTVar (msgRecv s) $! _msgRecv d
setPeriodStats (activeQueues s) (_activeQueues d)
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
writeTVar (msgSentNtf s) $! _msgSentNtf d
writeTVar (msgRecvNtf s) $! _msgRecvNtf d
setPeriodStats (activeQueuesNtf s) (_activeQueuesNtf d)
writeTVar (qCount s) $! _qCount d
writeTVar (msgCount s) $! _qCount d
instance StrEncoding ServerStatsData where
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues} =
strEncode ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf} =
B.unlines
[ "fromTime=" <> strEncode _fromTime,
"qCreated=" <> strEncode _qCreated,
@@ -78,8 +104,12 @@ instance StrEncoding ServerStatsData where
"qDeleted=" <> strEncode _qDeleted,
"msgSent=" <> strEncode _msgSent,
"msgRecv=" <> strEncode _msgRecv,
"msgSentNtf=" <> strEncode _msgSentNtf,
"msgRecvNtf=" <> strEncode _msgRecvNtf,
"activeQueues:",
strEncode _activeQueues
strEncode _activeQueues,
"activeQueuesNtf:",
strEncode _activeQueuesNtf
]
strP = do
_fromTime <- "fromTime=" *> strP <* A.endOfLine
@@ -88,15 +118,21 @@ instance StrEncoding ServerStatsData where
_qDeleted <- "qDeleted=" *> strP <* A.endOfLine
_msgSent <- "msgSent=" *> strP <* A.endOfLine
_msgRecv <- "msgRecv=" *> strP <* A.endOfLine
r <- optional ("activeQueues:" <* A.endOfLine)
_activeQueues <- case r of
Just _ -> strP <* optional A.endOfLine
_ -> do
_day <- "dayMsgQueues=" *> strP <* A.endOfLine
_week <- "weekMsgQueues=" *> strP <* A.endOfLine
_month <- "monthMsgQueues=" *> strP <* optional A.endOfLine
pure PeriodStatsData {_day, _week, _month}
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _activeQueues}
_msgSentNtf <- "msgSentNtf=" *> strP <* A.endOfLine <|> pure 0
_msgRecvNtf <- "msgRecvNtf=" *> strP <* A.endOfLine <|> pure 0
_activeQueues <-
optional ("activeQueues:" <* A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
_ -> do
_day <- "dayMsgQueues=" *> strP <* A.endOfLine
_week <- "weekMsgQueues=" *> strP <* A.endOfLine
_month <- "monthMsgQueues=" *> strP <* optional A.endOfLine
pure PeriodStatsData {_day, _week, _month}
_activeQueuesNtf <-
optional ("activeQueuesNtf:" <* A.endOfLine) >>= \case
Just _ -> strP <* optional A.endOfLine
_ -> pure newPeriodStatsData
pure ServerStatsData {_fromTime, _qCreated, _qSecured, _qDeleted, _msgSent, _msgRecv, _msgSentNtf, _msgRecvNtf, _activeQueues, _activeQueuesNtf, _qCount = 0, _msgCount = 0}
data PeriodStats a = PeriodStats
{ day :: TVar (Set a),
@@ -117,6 +153,9 @@ data PeriodStatsData a = PeriodStatsData
_month :: Set a
}
newPeriodStatsData :: PeriodStatsData a
newPeriodStatsData = PeriodStatsData {_day = S.empty, _week = S.empty, _month = S.empty}
getPeriodStatsData :: PeriodStats a -> STM (PeriodStatsData a)
getPeriodStatsData s = do
_day <- readTVar $ day s
+13 -5
View File
@@ -52,7 +52,7 @@ serverTests t@(ATransport t') = do
describe "Exceeding queue quota" $ testExceedQueueQuota t'
describe "Store log" $ testWithStoreLog t
describe "Restore messages" $ testRestoreMessages t
describe "Restore messages (v2)" $ testRestoreMessagesV2 t
describe "Restore messages (old / v2)" $ testRestoreMessagesV2 t
describe "Timing of AUTH error" $ testTiming t
describe "Message notifications" $ testMessageNotifications t
describe "Message expiration" $ do
@@ -628,10 +628,12 @@ testRestoreMessages at@(ATransport t) =
Resp "2" _ OK <- signSendRecv h sKey ("2", sId, _SEND "hello 2")
Resp "3" _ OK <- signSendRecv h sKey ("3", sId, _SEND "hello 3")
Resp "4" _ OK <- signSendRecv h sKey ("4", sId, _SEND "hello 4")
Resp "5" _ OK <- signSendRecv h sKey ("5", sId, _SEND "hello 5")
Resp "6" _ (ERR QUOTA) <- signSendRecv h sKey ("6", sId, _SEND "hello 6")
pure ()
logSize testStoreLogFile `shouldReturn` 2
logSize testStoreMsgsFile `shouldReturn` 3
logSize testStoreMsgsFile `shouldReturn` 5
withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do
rId <- readTVarIO recipientId
@@ -647,15 +649,21 @@ testRestoreMessages at@(ATransport t) =
logSize testStoreLogFile `shouldReturn` 1
-- the last message is not removed because it was not ACK'd
logSize testStoreMsgsFile `shouldReturn` 1
logSize testStoreMsgsFile `shouldReturn` 3
withSmpServerStoreMsgLogOn at testPort . runTest t $ \h -> do
rId <- readTVarIO recipientId
Just rKey <- readTVarIO recipientKey
Just dh <- readTVarIO dhShared
let dec = decryptMsgV3 dh
Resp "4" _ (Msg mId4 msg4) <- signSendRecv h rKey ("4", rId, SUB)
Resp "5" _ OK <- signSendRecv h rKey ("5", rId, ACK mId4)
(decryptMsgV3 dh mId4 msg4, Right "hello 4") #== "restored message delivered"
(dec mId4 msg4, Right "hello 4") #== "restored message delivered"
Resp "5" _ (Msg mId5 msg5) <- signSendRecv h rKey ("5", rId, ACK mId4)
(dec mId5 msg5, Right "hello 5") #== "restored message delivered"
Resp "6" _ (Msg mId6 msg6) <- signSendRecv h rKey ("6", rId, ACK mId5)
(dec mId6 msg6, Left "ClientRcvMsgQuota") #== "restored message delivered"
Resp "7" _ OK <- signSendRecv h rKey ("7", rId, ACK mId6)
pure ()
logSize testStoreLogFile `shouldReturn` 1
logSize testStoreMsgsFile `shouldReturn` 0
+21 -21
View File
@@ -1,8 +1,8 @@
{-# LANGUAGE TypeApplications #-}
import AgentTests (agentTests)
-- import Control.Logger.Simple
import CLITests
import Control.Logger.Simple
import CoreTests.CryptoTests
import CoreTests.EncodingTests
import CoreTests.ProtocolErrorTests
@@ -16,26 +16,26 @@ import System.Directory (createDirectoryIfMissing, removeDirectoryRecursive)
import System.Environment (setEnv)
import Test.Hspec
-- logCfg :: LogConfig
-- logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
logCfg :: LogConfig
logCfg = LogConfig {lc_file = Nothing, lc_stderr = True}
main :: IO ()
main = do
-- setLogLevel LogInfo -- LogError
-- withGlobalLogging logCfg $ do
createDirectoryIfMissing False "tests/tmp"
setEnv "APNS_KEY_ID" "H82WD9K9AQ"
setEnv "APNS_KEY_FILE" "./tests/fixtures/AuthKey_H82WD9K9AQ.p8"
hspec $ do
describe "Core tests" $ do
describe "Encoding tests" encodingTests
describe "Protocol error tests" protocolErrorTests
describe "Version range" versionRangeTests
describe "Encryption tests" cryptoTests
describe "Retry interval tests" retryIntervalTests
describe "SMP server via TLS" $ serverTests (transport @TLS)
describe "SMP server via WebSockets" $ serverTests (transport @WS)
describe "Notifications server" $ ntfServerTests (transport @TLS)
describe "SMP client agent" $ agentTests (transport @TLS)
describe "Server CLIs" cliTests
removeDirectoryRecursive "tests/tmp"
setLogLevel LogError -- LogInfo
withGlobalLogging logCfg $ do
createDirectoryIfMissing False "tests/tmp"
setEnv "APNS_KEY_ID" "H82WD9K9AQ"
setEnv "APNS_KEY_FILE" "./tests/fixtures/AuthKey_H82WD9K9AQ.p8"
hspec $ do
describe "Core tests" $ do
describe "Encoding tests" encodingTests
describe "Protocol error tests" protocolErrorTests
describe "Version range" versionRangeTests
describe "Encryption tests" cryptoTests
describe "Retry interval tests" retryIntervalTests
describe "SMP server via TLS" $ serverTests (transport @TLS)
describe "SMP server via WebSockets" $ serverTests (transport @WS)
describe "Notifications server" $ ntfServerTests (transport @TLS)
describe "SMP client agent" $ agentTests (transport @TLS)
describe "Server CLIs" cliTests
removeDirectoryRecursive "tests/tmp"