Files
simplexmq/tests/CoreTests/TRcvQueuesTests.hs
Evgeny 5241f5fe5e rfc: client certificates for servers using SMP protocol as clients (opertors' chat relays, notification servers, service bots) (#1534)
* rfc: client certificates for high volume clients (opertors' chat relays, notification servers, service bots)

* client certificates types (WIP)

* parameterize Transport

* protocol/schema/api changes

* agent API

* rename command

* agent subscriptions return local ClientServiceId to chat

* verify transmissions

* fix receiving client certificates, refactor

* ntf server: remove shared queue for all notification subscriptions (#1543)

* ntf server: remove shared queue for all notification subscriptions

* wait for subscriber with timeout

* safer

* refactor

* log

* remove unused

* WIP service subscriptions and associations, refactor

* process service subscriptions

* rename

* simplify switching subscriptions

* SMP service handshake with additional server handshake response

* notification delivery and STM persistence for services

* smp server: database storage, store log, fix encoding for STORE error, replace String with Text in locks and error

* stats

* more stats

* rename SMP commands

* service subscriptions in ntf server agent (tests fail)

* fix

* refactor

* exports

* subscribe ntf server as service for associated queues

* test ntf service connection, fix SOKS response, fix service associations not removed in STM storage

* INI option to support services

* ntf server: downgrade subscriptions when service is no longer supported, track counts of subscribed queues

* smp protocol: include service certificate fingerprint in the string signed over with entity key (TODO two tests fail)

* fix test

* ntf server prometheus stats, use Int64 in SOKS/ENDS responses (to avoid conversions), additional error status for ntf subscription

* update RFC

* refactor useServiceAuth to avoid ad hoc decisions about which commands use service signatures, and to prohibit service signatures on other commands

* remove duplicate service signature syntax check from checkCredentials, it is checked in verifyTransmission

* service errors, todos

* fix checkCredentials in ntf server, service errors

* refactor service auth

* refactor

* service agent: store returned queue count instead of expected

* refactor serverThread

* refactor serviceSig

* rename

* refactor, rename, test repeat NSUB service association

* respond with error to SUBS

* smp server: export/import service records between database and store log

* comment

* comments

* ghc 8.10.7
2025-06-06 08:03:47 +01:00

214 lines
9.3 KiB
Haskell

{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -Wno-orphans #-}
module CoreTests.TRcvQueuesTests where
import AgentTests.EqInstances ()
import qualified Data.ByteString.Char8 as B
import qualified Data.List.NonEmpty as L
import qualified Data.Map as M
import qualified Data.Set as S
import Data.String (IsString (..))
import Simplex.Messaging.Agent.Protocol (ConnId, QueueStatus (..), UserId)
import Simplex.Messaging.Agent.Store (RcvQueue, StoredRcvQueue (..))
import Simplex.Messaging.Agent.Store.Entity
import qualified Simplex.Messaging.Agent.TRcvQueues as RQ
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Protocol (EntityId (..), QueueMode (..), RecipientId, SMPServer, pattern NoEntity, pattern VersionSMPC)
import Test.Hspec hiding (fit, it)
import UnliftIO
import Util
tRcvQueuesTests :: Spec
tRcvQueuesTests = do
describe "connection API" $ do
it "hasConn" hasConnTest
it "hasConn, batch add" hasConnTestBatch
it "hasConn, batch idempotent" batchIdempotentTest
it "deleteConn" deleteConnTest
describe "session API" $ do
it "getSessQueues" getSessQueuesTest
it "getDelSessQueues" getDelSessQueuesTest
describe "queue transfer" $ do
it "getDelSessQueues-batchAddQueues preserves total length" removeSubsTest
instance IsString EntityId where fromString = EntityId . B.pack
checkDataInvariant :: RQ.Queue q => RQ.TRcvQueues q -> IO Bool
checkDataInvariant trq = atomically $ do
conns <- readTVar $ RQ.getConnections trq
qs <- readTVar $ RQ.getRcvQueues trq
-- three invariant checks
let inv1 = all (\cId -> (S.fromList . L.toList <$> M.lookup cId conns) == Just (M.keysSet (M.filter (\q -> RQ.connId' q == cId) qs))) (M.keys conns)
inv2 = all (\(k, q) -> maybe False ((k `elem`) . L.toList) (M.lookup (RQ.connId' q) conns)) (M.assocs qs)
inv3 = all (\(k, q) -> RQ.qKey q == k) (M.assocs qs)
pure $ inv1 && inv2 && inv3
hasConnTest :: IO ()
hasConnTest = do
trq <- RQ.empty
atomically $ RQ.addQueue (dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1") trq
checkDataInvariant trq `shouldReturn` True
atomically $ RQ.addQueue (dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2") trq
checkDataInvariant trq `shouldReturn` True
atomically $ RQ.addQueue (dummyRQ 0 "smp://1234-w==@beta" "c3" "r3") trq
checkDataInvariant trq `shouldReturn` True
atomically (RQ.hasConn "c1" trq) `shouldReturn` True
atomically (RQ.hasConn "c2" trq) `shouldReturn` True
atomically (RQ.hasConn "c3" trq) `shouldReturn` True
atomically (RQ.hasConn "nope" trq) `shouldReturn` False
hasConnTestBatch :: IO ()
hasConnTestBatch = do
trq <- RQ.empty
let qs = [dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1", dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2", dummyRQ 0 "smp://1234-w==@beta" "c3" "r3"]
atomically $ RQ.batchAddQueues trq qs
checkDataInvariant trq `shouldReturn` True
atomically (RQ.hasConn "c1" trq) `shouldReturn` True
atomically (RQ.hasConn "c2" trq) `shouldReturn` True
atomically (RQ.hasConn "c3" trq) `shouldReturn` True
atomically (RQ.hasConn "nope" trq) `shouldReturn` False
batchIdempotentTest :: IO ()
batchIdempotentTest = do
trq <- RQ.empty
let qs = [dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1", dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2", dummyRQ 0 "smp://1234-w==@beta" "c3" "r3"]
atomically $ RQ.batchAddQueues trq qs
checkDataInvariant trq `shouldReturn` True
qs' <- readTVarIO $ RQ.getRcvQueues trq
cs' <- readTVarIO $ RQ.getConnections trq
atomically $ RQ.batchAddQueues trq qs
checkDataInvariant trq `shouldReturn` True
readTVarIO (RQ.getRcvQueues trq) `shouldReturn` qs'
fmap L.nub <$> readTVarIO (RQ.getConnections trq) `shouldReturn` cs' -- connections get duplicated, but that doesn't appear to affect anybody
deleteConnTest :: IO ()
deleteConnTest = do
trq <- RQ.empty
atomically $ do
RQ.addQueue (dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1") trq
RQ.addQueue (dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2") trq
RQ.addQueue (dummyRQ 0 "smp://1234-w==@beta" "c3" "r3") trq
checkDataInvariant trq `shouldReturn` True
atomically $ RQ.deleteConn "c1" trq
checkDataInvariant trq `shouldReturn` True
atomically $ RQ.deleteConn "nope" trq
checkDataInvariant trq `shouldReturn` True
M.keys <$> readTVarIO (RQ.getConnections trq) `shouldReturn` ["c2", "c3"]
getSessQueuesTest :: IO ()
getSessQueuesTest = do
trq <- RQ.empty
atomically $ RQ.addQueue (dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1") trq
checkDataInvariant trq `shouldReturn` True
atomically $ RQ.addQueue (dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2") trq
checkDataInvariant trq `shouldReturn` True
atomically $ RQ.addQueue (dummyRQ 0 "smp://1234-w==@beta" "c3" "r3") trq
checkDataInvariant trq `shouldReturn` True
atomically $ RQ.addQueue (dummyRQ 1 "smp://1234-w==@beta" "c4" "r4") trq
checkDataInvariant trq `shouldReturn` True
let tSess1 = (0, "smp://1234-w==@alpha", Just "c1")
RQ.getSessQueues tSess1 trq `shouldReturn` [dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1"]
atomically (RQ.hasSessQueues tSess1 trq) `shouldReturn` True
let tSess2 = (1, "smp://1234-w==@alpha", Just "c1")
RQ.getSessQueues tSess2 trq `shouldReturn` []
atomically (RQ.hasSessQueues tSess2 trq) `shouldReturn` False
let tSess3 = (0, "smp://1234-w==@alpha", Just "nope")
RQ.getSessQueues tSess3 trq `shouldReturn` []
atomically (RQ.hasSessQueues tSess3 trq) `shouldReturn` False
let tSess4 = (0, "smp://1234-w==@alpha", Nothing)
RQ.getSessQueues tSess4 trq `shouldReturn` [dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2", dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1"]
atomically (RQ.hasSessQueues tSess4 trq) `shouldReturn` True
getDelSessQueuesTest :: IO ()
getDelSessQueuesTest = do
trq <- RQ.empty
let qs =
[ ("1", dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1"),
("1", dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2"),
("1", dummyRQ 0 "smp://1234-w==@beta" "c3" "r3"),
("1", dummyRQ 1 "smp://1234-w==@beta" "c4" "r4")
]
atomically $ RQ.batchAddQueues trq qs
checkDataInvariant trq `shouldReturn` True
-- no user
atomically (RQ.getDelSessQueues (2, "smp://1234-w==@alpha", Nothing) "1" trq) `shouldReturn` ([], [])
checkDataInvariant trq `shouldReturn` True
-- wrong user
atomically (RQ.getDelSessQueues (1, "smp://1234-w==@alpha", Nothing) "1" trq) `shouldReturn` ([], [])
checkDataInvariant trq `shouldReturn` True
-- connections intact
atomically (RQ.hasConn "c1" trq) `shouldReturn` True
atomically (RQ.hasConn "c2" trq) `shouldReturn` True
atomically (RQ.getDelSessQueues (0, "smp://1234-w==@alpha", Nothing) "1" trq) `shouldReturn` ([dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2", dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1"], ["c1", "c2"])
checkDataInvariant trq `shouldReturn` True
-- connections gone
atomically (RQ.hasConn "c1" trq) `shouldReturn` False
atomically (RQ.hasConn "c2" trq) `shouldReturn` False
-- non-matched connections intact
atomically (RQ.hasConn "c3" trq) `shouldReturn` True
atomically (RQ.hasConn "c4" trq) `shouldReturn` True
removeSubsTest :: IO ()
removeSubsTest = do
aq <- RQ.empty
let qs =
[ ("1", dummyRQ 0 "smp://1234-w==@alpha" "c1" "r1"),
("1", dummyRQ 0 "smp://1234-w==@alpha" "c2" "r2"),
("1", dummyRQ 0 "smp://1234-w==@beta" "c3" "r3"),
("1", dummyRQ 1 "smp://1234-w==@beta" "c4" "r4")
]
atomically $ RQ.batchAddQueues aq qs
pq <- RQ.empty
atomically (totalSize aq pq) `shouldReturn` (4, 4)
atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@alpha", Nothing) "1" aq >>= RQ.batchAddQueues pq . map ("1",) . fst
atomically (totalSize aq pq) `shouldReturn` (4, 4)
atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@beta", Just "non-existent") "1" aq >>= RQ.batchAddQueues pq . map ("1",) . fst
atomically (totalSize aq pq) `shouldReturn` (4, 4)
atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@localhost", Nothing) "1" aq >>= RQ.batchAddQueues pq . map ("1",) . fst
atomically (totalSize aq pq) `shouldReturn` (4, 4)
atomically $ RQ.getDelSessQueues (0, "smp://1234-w==@beta", Just "c3") "1" aq >>= RQ.batchAddQueues pq . map ("1",) . fst
atomically (totalSize aq pq) `shouldReturn` (4, 4)
totalSize :: RQ.TRcvQueues q -> RQ.TRcvQueues q -> STM (Int, Int)
totalSize a b = do
qsizeA <- M.size <$> readTVar (RQ.getRcvQueues a)
qsizeB <- M.size <$> readTVar (RQ.getRcvQueues b)
csizeA <- M.size <$> readTVar (RQ.getConnections a)
csizeB <- M.size <$> readTVar (RQ.getConnections b)
pure (qsizeA + qsizeB, csizeA + csizeB)
dummyRQ :: UserId -> SMPServer -> ConnId -> RecipientId -> RcvQueue
dummyRQ userId server connId rcvId =
RcvQueue
{ userId,
connId,
server,
rcvId,
rcvPrivateKey = C.APrivateAuthKey C.SEd25519 "MC4CAQAwBQYDK2VwBCIEIDfEfevydXXfKajz3sRkcQ7RPvfWUPoq6pu1TYHV1DEe",
rcvDhSecret = "01234567890123456789012345678901",
e2ePrivKey = "MC4CAQAwBQYDK2VuBCIEINCzbVFaCiYHoYncxNY8tSIfn0pXcIAhLBfFc0m+gOpk",
e2eDhSecret = Nothing,
sndId = NoEntity,
queueMode = Just QMMessaging,
shortLink = Nothing,
clientService = Nothing,
status = New,
dbQueueId = DBEntityId 0,
primary = True,
dbReplaceQueueId = Nothing,
rcvSwchStatus = Nothing,
smpClientVersion = VersionSMPC 123,
clientNtfCreds = Nothing,
deleteErrors = 0
}