servers: blocking records for content moderation (#1430)

* servers: blocking records for content moderation

* update

* encode BLOCKED as AUTH in old versions

* update

* unblock queue command

* test, status command
This commit is contained in:
Evgeny
2025-01-12 19:34:00 +00:00
committed by GitHub
parent 9d9ec8cd0b
commit 3d4e0b06c0
20 changed files with 415 additions and 94 deletions
+13 -4
View File
@@ -5,7 +5,7 @@ module Simplex.Messaging.Server.Control where
import qualified Data.Attoparsec.ByteString.Char8 as A
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Protocol (BasicAuth, SenderId)
import Simplex.Messaging.Protocol (BasicAuth, BlockingInfo, SenderId)
data CPClientRole = CPRNone | CPRUser | CPRAdmin
deriving (Eq)
@@ -22,6 +22,9 @@ data ControlProtocol
| CPSocketThreads
| CPServerInfo
| CPDelete SenderId
| CPStatus SenderId
| CPBlock SenderId BlockingInfo
| CPUnblock SenderId
| CPSave
| CPHelp
| CPQuit
@@ -39,14 +42,17 @@ instance StrEncoding ControlProtocol where
CPSockets -> "sockets"
CPSocketThreads -> "socket-threads"
CPServerInfo -> "server-info"
CPDelete bs -> "delete " <> strEncode bs
CPDelete sId -> "delete " <> strEncode sId
CPStatus sId -> "status " <> strEncode sId
CPBlock sId info -> "block " <> strEncode sId <> " " <> strEncode info
CPUnblock sId -> "unblock " <> strEncode sId
CPSave -> "save"
CPHelp -> "help"
CPQuit -> "quit"
CPSkip -> ""
strP =
A.takeTill (== ' ') >>= \case
"auth" -> CPAuth <$> (A.space *> strP)
"auth" -> CPAuth <$> _strP
"suspend" -> pure CPSuspend
"resume" -> pure CPResume
"clients" -> pure CPClients
@@ -56,7 +62,10 @@ instance StrEncoding ControlProtocol where
"sockets" -> pure CPSockets
"socket-threads" -> pure CPSocketThreads
"server-info" -> pure CPServerInfo
"delete" -> CPDelete <$> (A.space *> strP)
"delete" -> CPDelete <$> _strP
"status" -> CPStatus <$> _strP
"block" -> CPBlock <$> _strP <*> _strP
"unblock" -> CPUnblock <$> _strP
"save" -> pure CPSave
"help" -> pure CPHelp
"quit" -> pure CPQuit
@@ -56,6 +56,7 @@ prometheusMetrics sm rtm ts =
_qDeletedAllB,
_qDeletedNew,
_qDeletedSecured,
_qBlocked,
_qSub,
_qSubAllB,
_qSubAuth,
@@ -74,6 +75,7 @@ prometheusMetrics sm rtm ts =
_msgSentAuth,
_msgSentQuota,
_msgSentLarge,
_msgSentBlock,
_msgRecv,
_msgRecvGet,
_msgGet,
@@ -122,6 +124,10 @@ prometheusMetrics sm rtm ts =
\simplex_smp_queues_deleted{type=\"new\"} " <> mshow _qDeletedNew <> "\n# qDeletedNew\n\
\simplex_smp_queues_deleted{type=\"secured\"} " <> mshow _qDeletedSecured <> "\n# qDeletedSecured\n\
\\n\
\# HELP simplex_smp_queues_deleted Deleted queues\n\
\# TYPE simplex_smp_queues_deleted counter\n\
\simplex_smp_queues_blocked " <> mshow _qBlocked <> "\n# qBlocked\n\
\\n\
\# HELP simplex_smp_queues_deleted_batch Batched requests to delete queues\n\
\# TYPE simplex_smp_queues_deleted_batch counter\n\
\simplex_smp_queues_deleted_batch " <> mshow _qDeletedAllB <> "\n# qDeletedAllB\n\
@@ -197,6 +203,7 @@ prometheusMetrics sm rtm ts =
\simplex_smp_messages_sent_errors{type=\"auth\"} " <> mshow _msgSentAuth <> "\n# msgSentAuth\n\
\simplex_smp_messages_sent_errors{type=\"quota\"} " <> mshow _msgSentQuota <> "\n# msgSentQuota\n\
\simplex_smp_messages_sent_errors{type=\"large\"} " <> mshow _msgSentLarge <> "\n# msgSentLarge\n\
\simplex_smp_messages_sent_errors{type=\"block\"} " <> mshow _msgSentBlock <> "\n# msgSentBlock\n\
\\n\
\# HELP simplex_smp_messages_received Received messages.\n\
\# TYPE simplex_smp_messages_received counter\n\
+20 -2
View File
@@ -1,11 +1,15 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
module Simplex.Messaging.Server.QueueStore where
import Control.Applicative ((<|>))
import Data.Functor (($>))
import Data.Int (Int64)
import Data.Time.Clock.System (SystemTime (..), getSystemTime)
import Simplex.Messaging.Encoding.String
@@ -19,7 +23,7 @@ data QueueRec = QueueRec
senderKey :: !(Maybe SndPublicAuthKey),
sndSecure :: !SenderCanSecure,
notifier :: !(Maybe NtfCreds),
status :: !ServerQueueStatus,
status :: !ServerEntityStatus,
updatedAt :: !(Maybe RoundedSystemTime)
}
deriving (Show)
@@ -37,7 +41,21 @@ instance StrEncoding NtfCreds where
(notifierId, notifierKey, rcvNtfDhSecret) <- strP
pure NtfCreds {notifierId, notifierKey, rcvNtfDhSecret}
data ServerQueueStatus = QueueActive | QueueOff deriving (Eq, Show)
data ServerEntityStatus
= EntityActive
| EntityBlocked BlockingInfo
| EntityOff
deriving (Eq, Show)
instance StrEncoding ServerEntityStatus where
strEncode = \case
EntityActive -> "active"
EntityBlocked info -> "blocked," <> strEncode info
EntityOff -> "off"
strP =
"active" $> EntityActive
<|> "blocked," *> (EntityBlocked <$> strP)
<|> "off" $> EntityOff
newtype RoundedSystemTime = RoundedSystemTime Int64
deriving (Eq, Ord, Show)
+25 -1
View File
@@ -21,6 +21,8 @@ module Simplex.Messaging.Server.QueueStore.STM
addQueueNotifier,
deleteQueueNotifier,
suspendQueue,
blockQueue,
unblockQueue,
updateQueueTime,
deleteQueue',
readQueueStore,
@@ -118,7 +120,27 @@ suspendQueue st sq =
where
qr = queueRec' sq
suspend q = do
writeTVar qr $! Just q {status = QueueOff}
writeTVar qr $! Just q {status = EntityOff}
pure $ recipientId q
blockQueue :: STMQueueStore s => s -> StoreQueue s -> BlockingInfo -> IO (Either ErrorType ())
blockQueue st sq info =
atomically (readQueueRec qr >>= mapM block)
$>>= \rId -> withLog "blockQueue" st (\sl -> logBlockQueue sl rId info)
where
qr = queueRec' sq
block q = do
writeTVar qr $ Just q {status = EntityBlocked info}
pure $ recipientId q
unblockQueue :: STMQueueStore s => s -> StoreQueue s -> IO (Either ErrorType ())
unblockQueue st sq =
atomically (readQueueRec qr >>= mapM unblock)
$>>= \rId -> withLog "unblockQueue" st (`logUnblockQueue` rId)
where
qr = queueRec' sq
unblock q = do
writeTVar qr $ Just q {status = EntityActive}
pure $ recipientId q
updateQueueTime :: STMQueueStore s => s -> StoreQueue s -> RoundedSystemTime -> IO (Either ErrorType QueueRec)
@@ -177,6 +199,8 @@ readQueueStore f st = withFile f ReadMode $ LB.hGetContents >=> mapM_ processLin
SecureQueue qId sKey -> withQueue qId "SecureQueue" $ \q -> secureQueue st q sKey
AddNotifier qId ntfCreds -> withQueue qId "AddNotifier" $ \q -> addQueueNotifier st q ntfCreds
SuspendQueue qId -> withQueue qId "SuspendQueue" $ suspendQueue st
BlockQueue qId info -> withQueue qId "BlockQueue" $ \q -> blockQueue st q info
UnblockQueue qId -> withQueue qId "UnblockQueue" $ unblockQueue st
DeleteQueue qId -> withQueue qId "DeleteQueue" $ deleteQueue st qId
DeleteNotifier qId -> withQueue qId "DeleteNotifier" $ deleteQueueNotifier st
UpdateTime qId t -> withQueue qId "UpdateTime" $ \q -> updateQueueTime st q t
+20
View File
@@ -34,6 +34,7 @@ data ServerStats = ServerStats
qDeletedAllB :: IORef Int,
qDeletedNew :: IORef Int,
qDeletedSecured :: IORef Int,
qBlocked :: IORef Int,
qSub :: IORef Int, -- only includes subscriptions when there were pending messages
-- qSubNoMsg :: IORef Int, -- this stat creates too many STM transactions
qSubAllB :: IORef Int, -- count of all subscription batches (with and without pending messages)
@@ -53,6 +54,7 @@ data ServerStats = ServerStats
msgSentAuth :: IORef Int,
msgSentQuota :: IORef Int,
msgSentLarge :: IORef Int,
msgSentBlock :: IORef Int,
msgRecv :: IORef Int,
msgRecvGet :: IORef Int,
msgGet :: IORef Int,
@@ -89,6 +91,7 @@ data ServerStatsData = ServerStatsData
_qDeletedAllB :: Int,
_qDeletedNew :: Int,
_qDeletedSecured :: Int,
_qBlocked :: Int,
_qSub :: Int,
_qSubAllB :: Int,
_qSubAuth :: Int,
@@ -107,6 +110,7 @@ data ServerStatsData = ServerStatsData
_msgSentAuth :: Int,
_msgSentQuota :: Int,
_msgSentLarge :: Int,
_msgSentBlock :: Int,
_msgRecv :: Int,
_msgRecvGet :: Int,
_msgGet :: Int,
@@ -144,6 +148,7 @@ newServerStats ts = do
qDeletedAllB <- newIORef 0
qDeletedNew <- newIORef 0
qDeletedSecured <- newIORef 0
qBlocked <- newIORef 0
qSub <- newIORef 0
qSubAllB <- newIORef 0
qSubAuth <- newIORef 0
@@ -162,6 +167,7 @@ newServerStats ts = do
msgSentAuth <- newIORef 0
msgSentQuota <- newIORef 0
msgSentLarge <- newIORef 0
msgSentBlock <- newIORef 0
msgRecv <- newIORef 0
msgRecvGet <- newIORef 0
msgGet <- newIORef 0
@@ -196,6 +202,7 @@ newServerStats ts = do
qDeletedAllB,
qDeletedNew,
qDeletedSecured,
qBlocked,
qSub,
qSubAllB,
qSubAuth,
@@ -214,6 +221,7 @@ newServerStats ts = do
msgSentAuth,
msgSentQuota,
msgSentLarge,
msgSentBlock,
msgRecv,
msgRecvGet,
msgGet,
@@ -250,6 +258,7 @@ getServerStatsData s = do
_qDeletedAllB <- readIORef $ qDeletedAllB s
_qDeletedNew <- readIORef $ qDeletedNew s
_qDeletedSecured <- readIORef $ qDeletedSecured s
_qBlocked <- readIORef $ qBlocked s
_qSub <- readIORef $ qSub s
_qSubAllB <- readIORef $ qSubAllB s
_qSubAuth <- readIORef $ qSubAuth s
@@ -268,6 +277,7 @@ getServerStatsData s = do
_msgSentAuth <- readIORef $ msgSentAuth s
_msgSentQuota <- readIORef $ msgSentQuota s
_msgSentLarge <- readIORef $ msgSentLarge s
_msgSentBlock <- readIORef $ msgSentBlock s
_msgRecv <- readIORef $ msgRecv s
_msgRecvGet <- readIORef $ msgRecvGet s
_msgGet <- readIORef $ msgGet s
@@ -302,6 +312,7 @@ getServerStatsData s = do
_qDeletedAllB,
_qDeletedNew,
_qDeletedSecured,
_qBlocked,
_qSub,
_qSubAllB,
_qSubAuth,
@@ -320,6 +331,7 @@ getServerStatsData s = do
_msgSentAuth,
_msgSentQuota,
_msgSentLarge,
_msgSentBlock,
_msgRecv,
_msgRecvGet,
_msgGet,
@@ -357,6 +369,7 @@ setServerStats s d = do
writeIORef (qDeletedAllB s) $! _qDeletedAllB d
writeIORef (qDeletedNew s) $! _qDeletedNew d
writeIORef (qDeletedSecured s) $! _qDeletedSecured d
writeIORef (qBlocked s) $! _qBlocked d
writeIORef (qSub s) $! _qSub d
writeIORef (qSubAllB s) $! _qSubAllB d
writeIORef (qSubAuth s) $! _qSubAuth d
@@ -375,6 +388,7 @@ setServerStats s d = do
writeIORef (msgSentAuth s) $! _msgSentAuth d
writeIORef (msgSentQuota s) $! _msgSentQuota d
writeIORef (msgSentLarge s) $! _msgSentLarge d
writeIORef (msgSentBlock s) $! _msgSentBlock d
writeIORef (msgRecv s) $! _msgRecv d
writeIORef (msgRecvGet s) $! _msgRecvGet d
writeIORef (msgGet s) $! _msgGet d
@@ -411,6 +425,7 @@ instance StrEncoding ServerStatsData where
"qDeletedNew=" <> strEncode (_qDeletedNew d),
"qDeletedSecured=" <> strEncode (_qDeletedSecured d),
"qDeletedAllB=" <> strEncode (_qDeletedAllB d),
"qBlocked=" <> strEncode (_qBlocked d),
"qCount=" <> strEncode (_qCount d),
"qSub=" <> strEncode (_qSub d),
"qSubAllB=" <> strEncode (_qSubAllB d),
@@ -430,6 +445,7 @@ instance StrEncoding ServerStatsData where
"msgSentAuth=" <> strEncode (_msgSentAuth d),
"msgSentQuota=" <> strEncode (_msgSentQuota d),
"msgSentLarge=" <> strEncode (_msgSentLarge d),
"msgSentBlock=" <> strEncode (_msgSentBlock d),
"msgRecv=" <> strEncode (_msgRecv d),
"msgRecvGet=" <> strEncode (_msgRecvGet d),
"msgGet=" <> strEncode (_msgGet d),
@@ -467,6 +483,7 @@ instance StrEncoding ServerStatsData where
(,0,0) <$> ("qDeleted=" *> strP <* A.endOfLine)
<|> ((,,) <$> ("qDeletedAll=" *> strP <* A.endOfLine) <*> ("qDeletedNew=" *> strP <* A.endOfLine) <*> ("qDeletedSecured=" *> strP <* A.endOfLine))
_qDeletedAllB <- opt "qDeletedAllB="
_qBlocked <- opt "qBlocked="
_qCount <- opt "qCount="
_qSub <- opt "qSub="
_qSubNoMsg <- skipInt "qSubNoMsg=" -- skipping it for backward compatibility
@@ -487,6 +504,7 @@ instance StrEncoding ServerStatsData where
_msgSentAuth <- opt "msgSentAuth="
_msgSentQuota <- opt "msgSentQuota="
_msgSentLarge <- opt "msgSentLarge="
_msgSentBlock <- opt "msgSentBlock="
_msgRecv <- "msgRecv=" *> strP <* A.endOfLine
_msgRecvGet <- opt "msgRecvGet="
_msgGet <- opt "msgGet="
@@ -532,6 +550,7 @@ instance StrEncoding ServerStatsData where
_qDeletedAllB,
_qDeletedNew,
_qDeletedSecured,
_qBlocked,
_qSub,
_qSubAllB,
_qSubAuth,
@@ -550,6 +569,7 @@ instance StrEncoding ServerStatsData where
_msgSentAuth,
_msgSentQuota,
_msgSentLarge,
_msgSentBlock,
_msgRecv,
_msgRecvGet,
_msgGet,
+38 -14
View File
@@ -21,6 +21,8 @@ module Simplex.Messaging.Server.StoreLog
logSecureQueue,
logAddNotifier,
logSuspendQueue,
logBlockQueue,
logUnblockQueue,
logDeleteQueue,
logDeleteNotifier,
logUpdateQueueTime,
@@ -33,9 +35,9 @@ import Control.Applicative (optional, (<|>))
import Control.Concurrent.STM
import qualified Control.Exception as E
import Control.Logger.Simple
import Control.Monad
import qualified Data.Attoparsec.ByteString.Char8 as A
import qualified Data.ByteString.Char8 as B
import Data.Functor (($>))
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Time.Clock (getCurrentTime)
@@ -56,6 +58,8 @@ data StoreLogRecord
| SecureQueue QueueId SndPublicAuthKey
| AddNotifier QueueId NtfCreds
| SuspendQueue QueueId
| BlockQueue QueueId BlockingInfo
| UnblockQueue QueueId
| DeleteQueue QueueId
| DeleteNotifier QueueId
| UpdateTime QueueId RoundedSystemTime
@@ -66,12 +70,14 @@ data SLRTag
| SecureQueue_
| AddNotifier_
| SuspendQueue_
| BlockQueue_
| UnblockQueue_
| DeleteQueue_
| DeleteNotifier_
| UpdateTime_
instance StrEncoding QueueRec where
strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, updatedAt} =
strEncode QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt} =
B.unwords
[ "rid=" <> strEncode recipientId,
"rk=" <> strEncode recipientKey,
@@ -82,10 +88,14 @@ instance StrEncoding QueueRec where
<> sndSecureStr
<> maybe "" notifierStr notifier
<> maybe "" updatedAtStr updatedAt
<> statusStr
where
sndSecureStr = if sndSecure then " sndSecure=" <> strEncode sndSecure else ""
notifierStr ntfCreds = " notifier=" <> strEncode ntfCreds
updatedAtStr t = " updated_at=" <> strEncode t
statusStr = case status of
EntityActive -> ""
_ -> " status=" <> strEncode status
strP = do
recipientId <- "rid=" *> strP_
@@ -96,7 +106,8 @@ instance StrEncoding QueueRec where
sndSecure <- (" sndSecure=" *> strP) <|> pure False
notifier <- optional $ " notifier=" *> strP
updatedAt <- optional $ " updated_at=" *> strP
pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status = QueueActive, updatedAt}
status <- (" status=" *> strP) <|> pure EntityActive
pure QueueRec {recipientId, recipientKey, rcvDhSecret, senderId, senderKey, sndSecure, notifier, status, updatedAt}
instance StrEncoding SLRTag where
strEncode = \case
@@ -104,20 +115,24 @@ instance StrEncoding SLRTag where
SecureQueue_ -> "SECURE"
AddNotifier_ -> "NOTIFIER"
SuspendQueue_ -> "SUSPEND"
BlockQueue_ -> "BLOCK"
UnblockQueue_ -> "UNBLOCK"
DeleteQueue_ -> "DELETE"
DeleteNotifier_ -> "NDELETE"
UpdateTime_ -> "TIME"
strP =
A.takeTill (== ' ') >>= \case
"CREATE" -> pure CreateQueue_
"SECURE" -> pure SecureQueue_
"NOTIFIER" -> pure AddNotifier_
"SUSPEND" -> pure SuspendQueue_
"DELETE" -> pure DeleteQueue_
"NDELETE" -> pure DeleteNotifier_
"TIME" -> pure UpdateTime_
s -> fail $ "invalid log record tag: " <> B.unpack s
A.choice
[ "CREATE" $> CreateQueue_,
"SECURE" $> SecureQueue_,
"NOTIFIER" $> AddNotifier_,
"SUSPEND" $> SuspendQueue_,
"BLOCK" $> BlockQueue_,
"UNBLOCK" $> UnblockQueue_,
"DELETE" $> DeleteQueue_,
"NDELETE" $> DeleteNotifier_,
"TIME" $> UpdateTime_
]
instance StrEncoding StoreLogRecord where
strEncode = \case
@@ -125,6 +140,8 @@ instance StrEncoding StoreLogRecord where
SecureQueue rId sKey -> strEncode (SecureQueue_, rId, sKey)
AddNotifier rId ntfCreds -> strEncode (AddNotifier_, rId, ntfCreds)
SuspendQueue rId -> strEncode (SuspendQueue_, rId)
BlockQueue rId info -> strEncode (BlockQueue_, rId, info)
UnblockQueue rId -> strEncode (UnblockQueue_, rId)
DeleteQueue rId -> strEncode (DeleteQueue_, rId)
DeleteNotifier rId -> strEncode (DeleteNotifier_, rId)
UpdateTime rId t -> strEncode (UpdateTime_, rId, t)
@@ -135,6 +152,8 @@ instance StrEncoding StoreLogRecord where
SecureQueue_ -> SecureQueue <$> strP_ <*> strP
AddNotifier_ -> AddNotifier <$> strP_ <*> strP
SuspendQueue_ -> SuspendQueue <$> strP
BlockQueue_ -> BlockQueue <$> strP_ <*> strP
UnblockQueue_ -> UnblockQueue <$> strP
DeleteQueue_ -> DeleteQueue <$> strP
DeleteNotifier_ -> DeleteNotifier <$> strP
UpdateTime_ -> UpdateTime <$> strP_ <*> strP
@@ -179,6 +198,12 @@ logAddNotifier s qId ntfCreds = writeStoreLogRecord s $ AddNotifier qId ntfCreds
logSuspendQueue :: StoreLog 'WriteMode -> QueueId -> IO ()
logSuspendQueue s = writeStoreLogRecord s . SuspendQueue
logBlockQueue :: StoreLog 'WriteMode -> QueueId -> BlockingInfo -> IO ()
logBlockQueue s qId info = writeStoreLogRecord s $ BlockQueue qId info
logUnblockQueue :: StoreLog 'WriteMode -> QueueId -> IO ()
logUnblockQueue s = writeStoreLogRecord s . UnblockQueue
logDeleteQueue :: StoreLog 'WriteMode -> QueueId -> IO ()
logDeleteQueue s = writeStoreLogRecord s . DeleteQueue
@@ -228,6 +253,5 @@ writeQueueStore s st = readTVarIO (activeMsgQueues st) >>= mapM_ writeQueue . M.
where
writeQueue (rId, q) =
readTVarIO (queueRec' q) >>= \case
Just q' -> when (active q') $ logCreateQueue s q' -- TODO we should log suspended queues when we use them
Just q' -> logCreateQueue s q'
Nothing -> atomically $ TM.delete rId $ activeMsgQueues st
active QueueRec {status} = status == QueueActive