agent: remove service queue association when service ID changed, process ENDS event, test migrating to/from service (#1677)

* agent: remove service queue association when service ID changed

* agent: process ENDS event

* agent: send service subscription error event

* agent: test migrating to/from service subscriptions, fixes

* agent: always remove service when disabled, fix service subscriptions
This commit is contained in:
Evgeny
2025-12-19 21:10:12 +00:00
committed by GitHub
parent c8a72431c0
commit a1277bf6bf
11 changed files with 338 additions and 125 deletions

View File

@@ -221,7 +221,9 @@ import Simplex.Messaging.Protocol
SMPMsgMeta,
SParty (..),
SProtocolType (..),
ServiceSubResult,
ServiceSub (..),
ServiceSubResult (..),
ServiceSubError (..),
SndPublicAuthKey,
SubscriptionMode (..),
UserProtocol,
@@ -1040,10 +1042,10 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userLinkData_ clientData pqIni
createRcvQueue nonce_ qd e2eKeys = do
AgentConfig {smpClientVRange = vr} <- asks config
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
(rq, qUri, tSess, sessId) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e
(rq, qUri, tSess, sessId, serviceId_) <- newRcvQueue_ c nm userId connId srvWithAuth vr qd (isJust ntfServer_) subMode nonce_ e2eKeys `catchAllErrors` \e -> liftIO (print e) >> throwE e
atomically $ incSMPServerStat c userId srv connCreated
rq' <- withStore c $ \db -> updateNewConnRcv db connId rq subMode
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId serviceId_
mapM_ (newQueueNtfSubscription c rq') ntfServer_
pure (rq', qUri)
createConnReq :: SMPQueueUri -> AM (ConnectionRequestUri c)
@@ -1291,11 +1293,11 @@ joinConnSrvAsync _c _userId _connId _enableNtfs (CRContactUri _) _cInfo _subMode
createReplyQueue :: AgentClient -> NetworkRequestMode -> ConnData -> SndQueue -> SubscriptionMode -> SMPServerWithAuth -> AM SMPQueueInfo
createReplyQueue c nm ConnData {userId, connId, enableNtfs} SndQueue {smpClientVersion} subMode srv = do
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
(rq, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode
(rq, qUri, tSess, sessId, serviceId_) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode
atomically $ incSMPServerStat c userId (qServer rq) connCreated
let qInfo = toVersionT qUri smpClientVersion
rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq subMode
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId serviceId_
mapM_ (newQueueNtfSubscription c rq') ntfServer_
pure qInfo
@@ -1451,22 +1453,14 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
Just activeUserId -> sortOn (\(uId, _) -> if uId == activeUserId then 0 else 1 :: Int) userSrvs
Nothing -> userSrvs
useServices <- readTVarIO $ useClientServices c
-- These options are possible below:
-- 1) services fully disabled:
-- No service subscriptions will be attempted, and existing services and association will remain in in the database,
-- but they will be ignored because of hasService parameter set to False.
-- This approach preserves performance for all clients that do not use services.
-- 2) at least one user ID has services enabled:
-- Service will be loaded for all user/server combinations:
-- a) service is enabled for user ID and service record exists: subscription will be attempted,
-- b) service is disabled and record exists: service record and all associations will be removed,
-- c) service is disabled or no record: no subscription attempt.
-- Service will be loaded for all user/server combinations:
-- a) service is enabled for user ID and service record exists: subscription will be attempted,
-- b) service is disabled and record exists: service record and all associations will be removed,
-- c) service is disabled or no record: no subscription attempt.
-- On successful service subscription, only unassociated queues will be subscribed.
userSrvs'' <-
if any id useServices
then lift $ mapConcurrently (subscribeService useServices) userSrvs'
else pure $ map (,False) userSrvs'
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs''
userSrvs2 <- withStore' c $ \db -> mapM (getService db useServices) userSrvs'
userSrvs3 <- lift $ mapConcurrently subscribeService userSrvs2
rs <- lift $ mapConcurrently (subscribeUserServer maxPending currPending) userSrvs3
let (errs, oks) = partitionEithers rs
logInfo $ "subscribed " <> tshow (sum oks) <> " queues"
forM_ (L.nonEmpty errs) $ notifySub c . ERRS . L.map ("",)
@@ -1475,16 +1469,30 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
resumeAllCommands c
where
handleErr = (`catchAllErrors` \e -> notifySub' c "" (ERR e) >> throwE e)
subscribeService :: Map UserId Bool -> (UserId, SMPServer) -> AM' ((UserId, SMPServer), ServiceAssoc)
subscribeService useServices us@(userId, srv) = fmap ((us,) . fromRight False) $ tryAllErrors' $ do
withStore' c (\db -> getSubscriptionService db userId srv) >>= \case
getService :: DB.Connection -> Map UserId Bool -> (UserId, SMPServer) -> IO ((UserId, SMPServer), Maybe ServiceSub)
getService db useServices us@(userId, srv) =
fmap (us,) $ getSubscriptionService db userId srv >>= \case
Just serviceSub -> case M.lookup userId useServices of
Just True -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
Left e | clientServiceError e -> unassocQueues $> False
Just True -> pure $ Just serviceSub
_ -> Nothing <$ unassocUserServerRcvQueueSubs' db userId srv
_ -> pure Nothing
subscribeService :: ((UserId, SMPServer), Maybe ServiceSub) -> AM' ((UserId, SMPServer), ServiceAssoc)
subscribeService (us@(userId, srv), serviceSub_) = fmap ((us,) . fromRight False) $ tryAllErrors' $
case serviceSub_ of
Just serviceSub -> tryAllErrors (subscribeClientService c True userId srv serviceSub) >>= \case
Right (ServiceSubResult e _) -> case e of
Just SSErrorServiceId {} -> unassocQueues
-- Possibly, we should always resubscribe all when expected is greater than subscribed
Just SSErrorQueueCount {expectedQueueCount = n, subscribedQueueCount = n'} | n > 0 && n' == 0 -> unassocQueues
_ -> pure True
_ -> unassocQueues $> False
Left e -> do
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e)
if clientServiceError e
then unassocQueues
else pure True
where
unassocQueues = withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv
unassocQueues :: AM Bool
unassocQueues = False <$ withStore' c (\db -> unassocUserServerRcvQueueSubs' db userId srv)
_ -> pure False
subscribeUserServer :: Int -> TVar Int -> ((UserId, SMPServer), ServiceAssoc) -> AM' (Either AgentErrorType Int)
subscribeUserServer maxPending currPending ((userId, srv), hasService) = do
@@ -2219,10 +2227,10 @@ switchDuplexConnection c nm (DuplexConnection cData@ConnData {connId, userId} rq
srv' <- if srv == server then getNextSMPServer c userId [server] else pure srvAuth
-- TODO [notications] possible improvement would be to create ntf credentials here, to avoid creating them after rotation completes.
-- The problem is that currently subscription already exists, and we do not support queues with credentials but without subscriptions.
(q, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe
(q, qUri, tSess, sessId, serviceId_) <- newRcvQueue c nm userId connId srv' clientVRange SCMInvitation False SMSubscribe
let rq' = (q :: NewRcvQueue) {primary = True, dbReplaceQueueId = Just dbQueueId}
rq'' <- withStore c $ \db -> addConnRcvQueue db connId rq' SMSubscribe
lift $ addNewQueueSubscription c rq'' tSess sessId
lift $ addNewQueueSubscription c rq'' tSess sessId serviceId_
void . enqueueMessages c cData sqs SMP.noMsgFlags $ QADD [(qUri, Just (server, sndId))]
rq1 <- withStore' c $ \db -> setRcvSwitchStatus db rq $ Just RSSendingQADD
let rqs' = updatedQs rq1 rqs <> [rq'']
@@ -2908,7 +2916,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
processSubOk :: RcvQueue -> TVar [ConnId] -> TVar [RcvQueue] -> Maybe SMP.ServiceId -> IO ()
processSubOk rq@RcvQueue {connId} upConnIds serviceRQs serviceId_ =
atomically . whenM (isPendingSub rq) $ do
SS.addActiveSub tSess sessId rq $ currentSubs c
SS.addActiveSub tSess sessId serviceId_ rq $ currentSubs c
modifyTVar' upConnIds (connId :)
when (isJust serviceId_ && serviceId_ == clientServiceId_) $ modifyTVar' serviceRQs (rq :)
clientServiceId_ = (\THClientService {serviceId} -> serviceId) <$> (clientService =<< thAuth)
@@ -3115,16 +3123,26 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), THandlePar
notifyEnd removed
| removed = notify END >> logServer "<--" c srv rId "END"
| otherwise = logServer "<--" c srv rId "END from disconnected client - ignored"
-- TODO [certs rcv]
r@(SMP.ENDS _) -> unexpected r
SMP.ENDS n idsHash ->
atomically (ifM (activeClientSession c tSess sessId) (SS.deleteServiceSub tSess (currentSubs c) $> True) (pure False))
>>= notifyEnd
where
notifyEnd removed
| removed = do
forM_ clientServiceId_ $ \serviceId ->
notify_ B.empty $ SERVICE_END srv $ ServiceSub serviceId n idsHash
logServer "<--" c srv rId "ENDS"
| otherwise = logServer "<--" c srv rId "ENDS from disconnected client - ignored"
-- TODO [certs rcv] Possibly, we need to add some flag to connection that it was deleted
SMP.DELD -> atomically (removeSubscription c tSess connId rq) >> notify DELD
SMP.ERR e -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e
r -> unexpected r
where
notify :: forall e m. (AEntityI e, MonadIO m) => AEvent e -> m ()
notify msg =
let t = ("", connId, AEvt (sAEntity @e) msg)
notify = notify_ connId
notify_ :: forall e m. (AEntityI e, MonadIO m) => ConnId -> AEvent e -> m ()
notify_ connId' msg =
let t = ("", connId', AEvt (sAEntity @e) msg)
in atomically $ ifM (isFullTBQueue subQ) (modifyTVar' pendingMsgs (t :)) (writeTBQueue subQ t)
prohibited :: Text -> AM ()

View File

@@ -266,7 +266,6 @@ import Simplex.Messaging.Protocol
NetworkError (..),
MsgFlags (..),
MsgId,
IdsHash,
NtfServer,
NtfServerWithAuth,
ProtoServer,
@@ -283,6 +282,7 @@ import Simplex.Messaging.Protocol
SProtocolType (..),
ServiceSub (..),
ServiceSubResult (..),
ServiceSubError (..),
SndPublicAuthKey,
SubscriptionMode (..),
NewNtfCreds (..),
@@ -1420,7 +1420,7 @@ getSessionMode :: AgentClient -> STM TransportSessionMode
getSessionMode = fmap (sessionMode . snd) . readTVar . useNetworkConfig
{-# INLINE getSessionMode #-}
newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId)
newRcvQueue :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> SConnectionMode c -> Bool -> SubscriptionMode -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId, Maybe ServiceId)
newRcvQueue c nm userId connId srv vRange cMode enableNtfs subMode = do
let qrd = case cMode of SCMInvitation -> CQRMessaging Nothing; SCMContact -> CQRContact Nothing
e2eKeys <- atomically . C.generateKeyPair =<< asks random
@@ -1441,7 +1441,7 @@ queueReqData = \case
CQRMessaging d -> QRMessaging $ srvReq <$> d
CQRContact d -> QRContact $ srvReq <$> d
newRcvQueue_ :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> ClntQueueReqData -> Bool -> SubscriptionMode -> Maybe C.CbNonce -> C.KeyPairX25519 -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId)
newRcvQueue_ :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> SMPServerWithAuth -> VersionRangeSMPC -> ClntQueueReqData -> Bool -> SubscriptionMode -> Maybe C.CbNonce -> C.KeyPairX25519 -> AM (NewRcvQueue, SMPQueueUri, SMPTransportSession, SessionId, Maybe ServiceId)
newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enableNtfs subMode nonce_ (e2eDhKey, e2ePrivKey) = do
C.AuthAlg a <- asks (rcvAuthAlg . config)
g <- asks random
@@ -1483,7 +1483,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
deleteErrors = 0
}
qUri = SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey queueMode
pure (rq, qUri, tSess, sessionId thParams')
pure (rq, qUri, tSess, sessionId thParams', sessServiceId)
where
mkNtfCreds :: (C.AlgorithmI a, C.AuthAlgorithm a) => C.SAlgorithm a -> TVar ChaChaDRG -> SMPClient -> IO (Maybe (C.AAuthKeyPair, C.PrivateKeyX25519), Maybe NewNtfCreds)
mkNtfCreds a g smp
@@ -1526,23 +1526,23 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
processSubResults :: AgentClient -> SMPTransportSession -> SessionId -> Maybe ServiceId -> NonEmpty (RcvQueueSub, Either SMPClientError (Maybe ServiceId)) -> STM ([RcvQueueSub], [(RcvQueueSub, Maybe ClientNotice)])
processSubResults c tSess@(userId, srv, _) sessId serviceId_ rs = do
pending <- SS.getPendingSubs tSess $ currentSubs c
let (failed, subscribed@(qs, sQs), notices, ignored) = foldr (partitionResults pending) (M.empty, ([], []), [], 0) rs
pendingSubs <- SS.getPendingQueueSubs tSess $ currentSubs c
let (failed, subscribed@(qs, sQs), notices, ignored) = foldr (partitionResults pendingSubs) (M.empty, ([], []), [], 0) rs
unless (M.null failed) $ do
incSMPServerStat' c userId srv connSubErrs $ M.size failed
failSubscriptions c tSess failed
unless (null qs && null sQs) $ do
incSMPServerStat' c userId srv connSubscribed $ length qs + length sQs
SS.batchAddActiveSubs tSess sessId subscribed $ currentSubs c
SS.batchAddActiveSubs tSess sessId serviceId_ subscribed $ currentSubs c
unless (ignored == 0) $ incSMPServerStat' c userId srv connSubIgnored ignored
pure (sQs, notices)
where
partitionResults ::
(Map SMP.RecipientId RcvQueueSub, Maybe ServiceSub) ->
Map SMP.RecipientId RcvQueueSub ->
(RcvQueueSub, Either SMPClientError (Maybe ServiceId)) ->
(Map SMP.RecipientId SMPClientError, ([RcvQueueSub], [RcvQueueSub]), [(RcvQueueSub, Maybe ClientNotice)], Int) ->
(Map SMP.RecipientId SMPClientError, ([RcvQueueSub], [RcvQueueSub]), [(RcvQueueSub, Maybe ClientNotice)], Int)
partitionResults (pendingSubs, pendingSS) (rq@RcvQueueSub {rcvId, clientNoticeId}, r) acc@(failed, subscribed@(qs, sQs), notices, ignored) = case r of
partitionResults pendingSubs (rq@RcvQueueSub {rcvId, clientNoticeId}, r) acc@(failed, subscribed@(qs, sQs), notices, ignored) = case r of
Left e -> case smpErrorClientNotice e of
Just notice_ -> (failed', subscribed, (rq, notice_) : notices, ignored)
where
@@ -1554,8 +1554,8 @@ processSubResults c tSess@(userId, srv, _) sessId serviceId_ rs = do
failed' = M.insert rcvId e failed
Right serviceId_'
| rcvId `M.member` pendingSubs ->
let subscribed' = case (serviceId_, serviceId_', pendingSS) of
(Just sId, Just sId', Just ServiceSub {smpServiceId}) | sId == sId' && sId == smpServiceId -> (qs, rq : sQs)
let subscribed' = case (serviceId_, serviceId_') of
(Just sId, Just sId') | sId == sId' -> (qs, rq : sQs)
_ -> (rq : qs, sQs)
in (failed, subscribed', notices', ignored)
| otherwise -> (failed, subscribed, notices', ignored + 1)
@@ -1726,11 +1726,18 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do
resubscribeClientService :: AgentClient -> SMPTransportSession -> ServiceSub -> AM ServiceSubResult
resubscribeClientService c tSess@(userId, srv, _) serviceSub =
withServiceClient c tSess (\smp _ -> subscribeClientService_ c True tSess smp serviceSub) `catchE` \e -> do
when (clientServiceError e) $ do
tryAllErrors (withServiceClient c tSess $ \smp _ -> subscribeClientService_ c True tSess smp serviceSub) >>= \case
Right r@(ServiceSubResult e _) -> case e of
Just SSErrorServiceId {} -> unassocSubscribeQueues $> r
_ -> pure r
Left e -> do
when (clientServiceError e) $ unassocSubscribeQueues
atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR e)
throwE e
where
unassocSubscribeQueues = do
qs <- withStore' c $ \db -> unassocUserServerRcvQueueSubs db userId srv
void $ lift $ subscribeUserServerQueues c userId srv qs
throwE e
-- TODO [certs rcv] update service in the database if it has different ID and re-associate queues, and send event
subscribeClientService :: AgentClient -> Bool -> UserId -> SMPServer -> ServiceSub -> AM ServiceSubResult
@@ -1751,7 +1758,7 @@ withServiceClient c tSess subscribe =
-- TODO [certs rcv] send subscription error event?
subscribeClientService_ :: AgentClient -> Bool -> SMPTransportSession -> SMPClient -> ServiceSub -> ExceptT SMPClientError IO ServiceSubResult
subscribeClientService_ c withEvent tSess@(userId, srv, _) smp expected@(ServiceSub _ n idsHash) = do
subscribeClientService_ c withEvent tSess@(_, srv, _) smp expected@(ServiceSub _ n idsHash) = do
subscribed <- subscribeService smp SMP.SRecipientService n idsHash
let sessId = sessionId $ thParams smp
r = serviceSubResult expected subscribed
@@ -1821,14 +1828,14 @@ getRemovedSubs AgentClient {removedSubs} k = TM.lookup k removedSubs >>= maybe n
TM.insert k s removedSubs
pure s
addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> AM' ()
addNewQueueSubscription c rq' tSess sessId = do
addNewQueueSubscription :: AgentClient -> RcvQueue -> SMPTransportSession -> SessionId -> Maybe ServiceId -> AM' ()
addNewQueueSubscription c rq' tSess sessId serviceId_ = do
let rq = rcvQueueSub rq'
same <- atomically $ do
modifyTVar' (subscrConns c) $ S.insert $ qConnId rq
active <- activeClientSession c tSess sessId
if active
then SS.addActiveSub tSess sessId rq' $ currentSubs c
then SS.addActiveSub tSess sessId serviceId_ rq' $ currentSubs c
else SS.addPendingSub tSess rq $ currentSubs c
pure active
unless same $ resubscribeSMPSession c tSess

View File

@@ -393,6 +393,7 @@ data AEvent (e :: AEntity) where
SERVICE_ALL :: SMPServer -> AEvent AENone -- all service messages are delivered
SERVICE_DOWN :: SMPServer -> ServiceSub -> AEvent AENone
SERVICE_UP :: SMPServer -> ServiceSubResult -> AEvent AENone
SERVICE_END :: SMPServer -> ServiceSub -> AEvent AENone
SWITCH :: QueueDirection -> SwitchPhase -> ConnectionStats -> AEvent AEConn
RSYNC :: RatchetSyncState -> Maybe AgentCryptoError -> ConnectionStats -> AEvent AEConn
SENT :: AgentMsgId -> Maybe SMPServer -> AEvent AEConn
@@ -467,6 +468,7 @@ data AEventTag (e :: AEntity) where
SERVICE_ALL_ :: AEventTag AENone
SERVICE_DOWN_ :: AEventTag AENone
SERVICE_UP_ :: AEventTag AENone
SERVICE_END_ :: AEventTag AENone
SWITCH_ :: AEventTag AEConn
RSYNC_ :: AEventTag AEConn
SENT_ :: AEventTag AEConn
@@ -525,6 +527,7 @@ aEventTag = \case
SERVICE_ALL _ -> SERVICE_ALL_
SERVICE_DOWN {} -> SERVICE_DOWN_
SERVICE_UP {} -> SERVICE_UP_
SERVICE_END {} -> SERVICE_END_
SWITCH {} -> SWITCH_
RSYNC {} -> RSYNC_
SENT {} -> SENT_

View File

@@ -38,7 +38,6 @@ module Simplex.Messaging.Agent.Store.AgentStore
-- * Client services
createClientService,
getClientServiceCredentials,
getSubscriptionServices,
getSubscriptionService,
getClientServiceServers,
setClientServiceId,
@@ -55,6 +54,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
getSubscriptionServers,
getUserServerRcvQueueSubs,
unassocUserServerRcvQueueSubs,
unassocUserServerRcvQueueSubs',
unsetQueuesToSubscribe,
setRcvServiceAssocs,
removeRcvServiceAssocs,
@@ -344,7 +344,7 @@ handleSQLError err e = case constraintViolation e of
handleSQLError :: StoreError -> SQLError -> StoreError
handleSQLError err e
| SQL.sqlError e == SQL.ErrorConstraint = err
| otherwise = SEInternal $ bshow e
| otherwise = SEInternal $ encodeUtf8 $ tshow e <> ": " <> SQL.sqlErrorDetails e <> ", " <> SQL.sqlErrorContext e
#endif
createUserRecord :: DB.Connection -> IO UserId
@@ -439,11 +439,6 @@ getClientServiceCredentials db userId srv =
where
toService (kh, cert, pk, serviceId_) = ((kh, (cert, pk)), serviceId_)
getSubscriptionServices :: DB.Connection -> IO [(UserId, (SMPServer, ServiceSub))]
getSubscriptionServices db = map toUserService <$> DB.query_ db clientServiceQuery
where
toUserService (Only userId :. serviceRow) = (userId, toServerService serviceRow)
getSubscriptionService :: DB.Connection -> UserId -> SMPServer -> IO (Maybe ServiceSub)
getSubscriptionService db userId (SMPServer h p kh) =
maybeFirstRow toService $
@@ -453,7 +448,7 @@ getSubscriptionService db userId (SMPServer h p kh) =
SELECT c.service_id, c.service_queue_count, c.service_queue_ids_hash
FROM client_services c
JOIN servers s ON s.host = c.host AND s.port = c.port
WHERE c.user_id = ? AND c.host = ? AND c.port = ? AND COALESCE(c.server_key_hash, s.key_hash) = ?
WHERE c.user_id = ? AND c.host = ? AND c.port = ? AND COALESCE(c.server_key_hash, s.key_hash) = ? AND service_id IS NOT NULL
|]
(userId, h, p, kh)
where
@@ -461,15 +456,16 @@ getSubscriptionService db userId (SMPServer h p kh) =
getClientServiceServers :: DB.Connection -> UserId -> IO [(SMPServer, ServiceSub)]
getClientServiceServers db userId =
map toServerService <$> DB.query db (clientServiceQuery <> " WHERE c.user_id = ?") (Only userId)
clientServiceQuery :: Query
clientServiceQuery =
[sql|
SELECT c.host, c.port, COALESCE(c.server_key_hash, s.key_hash), c.service_id, c.service_queue_count, c.service_queue_ids_hash
FROM client_services c
JOIN servers s ON s.host = c.host AND s.port = c.port
|]
map toServerService <$>
DB.query
db
[sql|
SELECT c.host, c.port, COALESCE(c.server_key_hash, s.key_hash), c.service_id, c.service_queue_count, c.service_queue_ids_hash
FROM client_services c
JOIN servers s ON s.host = c.host AND s.port = c.port
WHERE c.user_id = ? AND service_id IS NOT NULL
|]
(Only userId)
toServerService :: (NonEmpty TransportHost, ServiceName, C.KeyHash, ServiceId, Int64, Binary ByteString) -> (ProtocolServer 'PSMP, ServiceSub)
toServerService (host, port, kh, serviceId, n, Binary idsHash) =
@@ -487,14 +483,20 @@ setClientServiceId db userId srv serviceId =
(serviceId, userId, host srv, port srv)
deleteClientService :: DB.Connection -> UserId -> SMPServer -> IO ()
deleteClientService db userId srv =
deleteClientService db userId (SMPServer h p kh) =
DB.execute
db
[sql|
DELETE FROM client_services
WHERE user_id = ? AND host = ? AND port = ?
AND EXISTS (
SELECT 1 FROM servers s
WHERE s.host = client_services.host
AND s.port = client_services.port
AND COALESCE(client_services.server_key_hash, s.key_hash) = ?
);
|]
(userId, host srv, port srv)
(userId, h, p, Just kh)
deleteClientServices :: DB.Connection -> UserId -> IO ()
deleteClientServices db userId = do
@@ -2279,7 +2281,8 @@ getUserServerRcvQueueSubs db userId (SMPServer h p kh) onlyNeeded hasService =
| otherwise = ""
unassocUserServerRcvQueueSubs :: DB.Connection -> UserId -> SMPServer -> IO [RcvQueueSub]
unassocUserServerRcvQueueSubs db userId (SMPServer h p kh) =
unassocUserServerRcvQueueSubs db userId srv@(SMPServer h p kh) = do
deleteClientService db userId srv
map toRcvQueueSub
<$> DB.query
db
@@ -2293,6 +2296,11 @@ unassocUserServerRcvQueueSubs db userId (SMPServer h p kh) =
rcv_queues.rcv_queue_id, rcv_queues.rcv_primary, rcv_queues.replace_rcv_queue_id
|]
unassocUserServerRcvQueueSubs' :: DB.Connection -> UserId -> SMPServer -> IO ()
unassocUserServerRcvQueueSubs' db userId srv@(SMPServer h p kh) = do
deleteClientService db userId srv
DB.execute db removeRcvAssocsQuery (h, p, userId, kh)
unsetQueuesToSubscribe :: DB.Connection -> IO ()
unsetQueuesToSubscribe db = DB.execute_ db "UPDATE rcv_queues SET to_subscribe = 0 WHERE to_subscribe = 1"

View File

@@ -23,8 +23,10 @@ module Simplex.Messaging.Agent.TSessionSubs
batchDeletePendingSubs,
deleteSub,
batchDeleteSubs,
deleteServiceSub,
hasPendingSubs,
getPendingSubs,
getPendingQueueSubs,
getActiveSubs,
setSubsPending,
updateClientNotices,
@@ -39,12 +41,12 @@ import Data.Int (Int64)
import Data.List (foldl')
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (isJust)
import Data.Maybe (fromMaybe, isJust)
import qualified Data.Set as S
import Simplex.Messaging.Agent.Protocol (SMPQueue (..))
import Simplex.Messaging.Agent.Store (RcvQueue, RcvQueueSub (..), SomeRcvQueue, StoredRcvQueue (rcvServiceAssoc), rcvQueueSub)
import Simplex.Messaging.Agent.Store (RcvQueue, RcvQueueSub (..), ServiceAssoc, SomeRcvQueue, StoredRcvQueue (rcvServiceAssoc), rcvQueueSub)
import Simplex.Messaging.Client (SMPTransportSession, TransportSessionMode (..))
import Simplex.Messaging.Protocol (RecipientId, ServiceSub (..), queueIdHash)
import Simplex.Messaging.Protocol (IdsHash, RecipientId, ServiceSub (..), queueIdHash)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport
@@ -119,40 +121,48 @@ setActiveServiceSub tSess sessId serviceSub ss = do
writeTVar (pendingServiceSub s) Nothing
else writeTVar (pendingServiceSub s) $ Just serviceSub
addActiveSub :: SMPTransportSession -> SessionId -> RcvQueue -> TSessionSubs -> STM ()
addActiveSub tSess sessId rq = addActiveSub' tSess sessId (rcvQueueSub rq) (rcvServiceAssoc rq)
addActiveSub :: SMPTransportSession -> SessionId -> Maybe ServiceId -> RcvQueue -> TSessionSubs -> STM ()
addActiveSub tSess sessId serviceId_ rq = addActiveSub' tSess sessId serviceId_ (rcvQueueSub rq) (rcvServiceAssoc rq)
{-# INLINE addActiveSub #-}
addActiveSub' :: SMPTransportSession -> SessionId -> RcvQueueSub -> Bool -> TSessionSubs -> STM ()
addActiveSub' tSess sessId rq serviceAssoc ss = do
addActiveSub' :: SMPTransportSession -> SessionId -> Maybe ServiceId -> RcvQueueSub -> ServiceAssoc -> TSessionSubs -> STM ()
addActiveSub' tSess sessId serviceId_ rq serviceAssoc ss = do
s <- getSessSubs tSess ss
sessId' <- readTVar $ subsSessId s
let rId = rcvId rq
if Just sessId == sessId'
then do
TM.insert rId rq $ activeSubs s
TM.delete rId $ pendingSubs s
when serviceAssoc $
let updateServiceSub (ServiceSub serviceId n idsHash) = ServiceSub serviceId (n + 1) (idsHash <> queueIdHash rId)
in modifyTVar' (activeServiceSub s) (updateServiceSub <$>)
case serviceId_ of
Just serviceId | serviceAssoc -> updateActiveService s serviceId 1 (queueIdHash rId)
_ -> TM.insert rId rq $ activeSubs s
else TM.insert rId rq $ pendingSubs s
batchAddActiveSubs :: SMPTransportSession -> SessionId -> ([RcvQueueSub], [RcvQueueSub]) -> TSessionSubs -> STM ()
batchAddActiveSubs tSess sessId (rqs, serviceRQs) ss = do
batchAddActiveSubs :: SMPTransportSession -> SessionId -> Maybe ServiceId -> ([RcvQueueSub], [RcvQueueSub]) -> TSessionSubs -> STM ()
batchAddActiveSubs tSess sessId serviceId_ (rqs, serviceRQs) ss = do
s <- getSessSubs tSess ss
sessId' <- readTVar $ subsSessId s
let qs = M.fromList $ map (\rq -> (rcvId rq, rq)) rqs
let qs = queuesMap rqs
serviceQs = queuesMap serviceRQs
if Just sessId == sessId'
then do
TM.union qs $ activeSubs s
modifyTVar' (pendingSubs s) (`M.difference` qs)
serviceSub_ <- readTVar $ activeServiceSub s
forM_ serviceSub_ $ \(ServiceSub serviceId n idsHash) -> do
unless (null serviceRQs) $ do
let idsHash' = idsHash <> mconcat (map (queueIdHash . rcvId) serviceRQs)
n' = n + fromIntegral (length serviceRQs)
writeTVar (activeServiceSub s) $ Just $ ServiceSub serviceId n' idsHash'
else TM.union qs $ pendingSubs s
unless (null serviceRQs) $ forM_ serviceId_ $ \serviceId -> do
modifyTVar' (pendingSubs s) (`M.difference` serviceQs)
updateActiveService s serviceId (fromIntegral $ length serviceRQs) (mconcat $ map (queueIdHash . rcvId) serviceRQs)
else do
TM.union qs $ pendingSubs s
when (isJust serviceId_ && not (null serviceRQs)) $ TM.union serviceQs $ pendingSubs s
where
queuesMap = M.fromList . map (\rq -> (rcvId rq, rq))
updateActiveService :: SessSubs -> ServiceId -> Int64 -> IdsHash -> STM ()
updateActiveService s serviceId addN addIdsHash = do
ServiceSub serviceId' n idsHash <-
fromMaybe (ServiceSub serviceId 0 mempty) <$> readTVar (activeServiceSub s)
when (serviceId == serviceId') $
writeTVar (activeServiceSub s) $ Just $ ServiceSub serviceId (n + addN) (idsHash <> addIdsHash)
batchAddPendingSubs :: SMPTransportSession -> [RcvQueueSub] -> TSessionSubs -> STM ()
batchAddPendingSubs tSess rqs ss = do
@@ -176,6 +186,9 @@ batchDeleteSubs tSess rqs = lookupSubs tSess >=> mapM_ (\s -> delete (activeSubs
rIds = S.fromList $ map queueId rqs
delete = (`modifyTVar'` (`M.withoutKeys` rIds))
deleteServiceSub :: SMPTransportSession -> TSessionSubs -> STM ()
deleteServiceSub tSess = lookupSubs tSess >=> mapM_ (\s -> writeTVar (activeServiceSub s) Nothing >> writeTVar (pendingServiceSub s) Nothing)
hasPendingSubs :: SMPTransportSession -> TSessionSubs -> STM Bool
hasPendingSubs tSess = lookupSubs tSess >=> maybe (pure False) (\s -> anyM [hasSubs s, hasServiceSub s])
where
@@ -187,6 +200,10 @@ getPendingSubs tSess = lookupSubs tSess >=> maybe (pure (M.empty, Nothing)) get
where
get s = liftM2 (,) (readTVar $ pendingSubs s) (readTVar $ pendingServiceSub s)
getPendingQueueSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
getPendingQueueSubs = getSubs_ pendingSubs
{-# INLINE getPendingQueueSubs #-}
getActiveSubs :: SMPTransportSession -> TSessionSubs -> STM (Map RecipientId RcvQueueSub)
getActiveSubs = getSubs_ activeSubs
{-# INLINE getActiveSubs #-}

View File

@@ -147,7 +147,6 @@ module Simplex.Messaging.Protocol
serviceSubResult,
queueIdsHash,
queueIdHash,
noIdsHash,
addServiceSubs,
subtractServiceSubs,
MaxMessageLen,
@@ -726,7 +725,7 @@ data BrokerMsg where
RRES :: EncFwdResponse -> BrokerMsg -- relay to proxy
PRES :: EncResponse -> BrokerMsg -- proxy to client
END :: BrokerMsg
ENDS :: Int64 -> BrokerMsg
ENDS :: Int64 -> IdsHash -> BrokerMsg
DELD :: BrokerMsg
INFO :: QueueInfo -> BrokerMsg
OK :: BrokerMsg
@@ -1518,10 +1517,6 @@ instance Monoid IdsHash where
xor' :: Word8 -> Word8 -> Word8
xor' x y = let !r = xor x y in r
noIdsHash ::IdsHash
noIdsHash = IdsHash B.empty
{-# INLINE noIdsHash #-}
queueIdsHash :: [QueueId] -> IdsHash
queueIdsHash = mconcat . map queueIdHash
@@ -1535,7 +1530,7 @@ addServiceSubs (n', idsHash') (n, idsHash) = (n + n', idsHash <> idsHash')
subtractServiceSubs :: (Int64, IdsHash) -> (Int64, IdsHash) -> (Int64, IdsHash)
subtractServiceSubs (n', idsHash') (n, idsHash)
| n > n' = (n - n', idsHash <> idsHash') -- concat is a reversible xor: (x `xor` y) `xor` y == x
| otherwise = (0, noIdsHash)
| otherwise = (0, mempty)
data ProtocolErrorType = PECmdSyntax | PECmdUnknown | PESession | PEBlock
@@ -1883,7 +1878,7 @@ instance ProtocolEncoding SMPVersion ErrorType Cmd where
QUE_ -> pure QUE
CT SRecipientService SUBS_
| v >= rcvServiceSMPVersion -> Cmd SRecipientService <$> (SUBS <$> _smpP <*> smpP)
| otherwise -> pure $ Cmd SRecipientService $ SUBS (-1) noIdsHash
| otherwise -> pure $ Cmd SRecipientService $ SUBS (-1) mempty
CT SSender tag ->
Cmd SSender <$> case tag of
SKEY_ -> SKEY <$> _smpP
@@ -1902,7 +1897,7 @@ instance ProtocolEncoding SMPVersion ErrorType Cmd where
CT SNotifier NSUB_ -> pure $ Cmd SNotifier NSUB
CT SNotifierService NSUBS_
| v >= rcvServiceSMPVersion -> Cmd SNotifierService <$> (NSUBS <$> _smpP <*> smpP)
| otherwise -> pure $ Cmd SNotifierService $ NSUBS (-1) noIdsHash
| otherwise -> pure $ Cmd SNotifierService $ NSUBS (-1) mempty
fromProtocolError = fromProtocolError @SMPVersion @ErrorType @BrokerMsg
{-# INLINE fromProtocolError #-}
@@ -1925,9 +1920,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
SOK serviceId_
| v >= serviceCertsSMPVersion -> e (SOK_, ' ', serviceId_)
| otherwise -> e OK_ -- won't happen, the association with the service requires v >= serviceCertsSMPVersion
SOKS n idsHash
| v >= rcvServiceSMPVersion -> e (SOKS_, ' ', n, idsHash)
| otherwise -> e (SOKS_, ' ', n)
SOKS n idsHash -> serviceResp SOKS_ n idsHash
MSG RcvMessage {msgId, msgBody = EncRcvMsgBody body} ->
e (MSG_, ' ', msgId, Tail body)
ALLS -> e ALLS_
@@ -1937,7 +1930,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
RRES (EncFwdResponse encBlock) -> e (RRES_, ' ', Tail encBlock)
PRES (EncResponse encBlock) -> e (PRES_, ' ', Tail encBlock)
END -> e END_
ENDS n -> e (ENDS_, ' ', n)
ENDS n idsHash -> serviceResp ENDS_ n idsHash
DELD
| v >= deletedEventSMPVersion -> e DELD_
| otherwise -> e END_
@@ -1954,6 +1947,9 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
where
e :: Encoding a => a -> ByteString
e = smpEncode
serviceResp tag n idsHash
| v >= serviceCertsSMPVersion = e (tag, ' ', n, idsHash)
| otherwise = e (tag, ' ', n)
protocolP v = \case
MSG_ -> do
@@ -1982,21 +1978,23 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
pure $ IDS QIK {rcvId, sndId, rcvPublicDhKey, queueMode, linkId, serviceId, serverNtfCreds}
LNK_ -> LNK <$> _smpP <*> smpP
SOK_ -> SOK <$> _smpP
SOKS_
| v >= rcvServiceSMPVersion -> SOKS <$> _smpP <*> smpP
| otherwise -> SOKS <$> _smpP <*> pure noIdsHash
SOKS_ -> serviceRespP SOKS
NID_ -> NID <$> _smpP <*> smpP
NMSG_ -> NMSG <$> _smpP <*> smpP
PKEY_ -> PKEY <$> _smpP <*> smpP <*> smpP
RRES_ -> RRES <$> (EncFwdResponse . unTail <$> _smpP)
PRES_ -> PRES <$> (EncResponse . unTail <$> _smpP)
END_ -> pure END
ENDS_ -> ENDS <$> _smpP
ENDS_ -> serviceRespP ENDS
DELD_ -> pure DELD
INFO_ -> INFO <$> _smpP
OK_ -> pure OK
ERR_ -> ERR <$> _smpP
PONG_ -> pure PONG
where
serviceRespP resp
| v >= serviceCertsSMPVersion = resp <$> _smpP <*> smpP
| otherwise = resp <$> _smpP <*> pure mempty
fromProtocolError = \case
PECmdSyntax -> CMD SYNTAX

View File

@@ -316,8 +316,8 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg, startOpt
cancelServiceSubs :: ServiceId -> Maybe (Client s) -> STM [PrevClientSub s]
cancelServiceSubs serviceId =
checkAnotherClient $ \c -> do
changedSubs@(n, _) <- swapTVar (clientServiceSubs c) (0, noIdsHash)
pure [(c, CSADecreaseSubs changedSubs, (serviceId, ENDS n))]
changedSubs@(n, idsHash) <- swapTVar (clientServiceSubs c) (0, mempty)
pure [(c, CSADecreaseSubs changedSubs, (serviceId, ENDS n idsHash))]
checkAnotherClient :: (Client s -> STM [PrevClientSub s]) -> Maybe (Client s) -> STM [PrevClientSub s]
checkAnotherClient mkSub = \case
Just c@Client {clientId, connected} | clntId /= clientId ->

View File

@@ -502,7 +502,7 @@ newServerSubscribers = do
subQ <- newTQueueIO
queueSubscribers <- SubscribedClients <$> TM.emptyIO
serviceSubscribers <- SubscribedClients <$> TM.emptyIO
totalServiceSubs <- newTVarIO (0, noIdsHash)
totalServiceSubs <- newTVarIO (0, mempty)
subClients <- newTVarIO IS.empty
pendingEvents <- newTVarIO IM.empty
pure ServerSubscribers {subQ, queueSubscribers, serviceSubscribers, totalServiceSubs, subClients, pendingEvents}
@@ -513,8 +513,8 @@ newClient clientId qSize clientTHParams createdAt = do
ntfSubscriptions <- TM.emptyIO
serviceSubscribed <- newTVarIO False
ntfServiceSubscribed <- newTVarIO False
serviceSubsCount <- newTVarIO (0, noIdsHash)
ntfServiceSubsCount <- newTVarIO (0, noIdsHash)
serviceSubsCount <- newTVarIO (0, mempty)
ntfServiceSubsCount <- newTVarIO (0, mempty)
rcvQ <- newTBQueueIO qSize
sndQ <- newTBQueueIO qSize
msgQ <- newTBQueueIO qSize

View File

@@ -480,6 +480,7 @@ functionalAPITests ps = do
describe "Client service certificates" $ do
it "should connect, subscribe and reconnect as a service" $ testClientServiceConnection ps
it "should re-subscribe when service ID changed" $ testClientServiceIDChange ps
it "migrate connections to and from service" $ testMigrateConnectionsToService ps
describe "Connection switch" $ do
describe "should switch delivery to the new queue" $
testServerMatrix2 ps testSwitchConnection
@@ -3721,10 +3722,22 @@ testClientServiceConnection ps = do
testClientServiceIDChange :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
testClientServiceIDChange ps@(_, ASType qs _) = do
(sId, uId) <- withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
conns <- withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
conns@(sId, uId) <- makeConnection service user
exchangeGreetings service uId user sId
pure conns
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 1 _)) <- nGet service
("", "", DOWN _ [_]) <- nGet user
withSmpServerStoreLogOn ps testPort $ \_ -> do
getInAnyOrder service
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 1 _)))) -> True; _ -> False,
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
]
("", "", UP _ [_]) <- nGet user
pure ()
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 1 _)) <- nGet service
("", "", DOWN _ [_]) <- nGet user
pure conns
_ :: () <- case qs of
SQSPostgres -> do
#if defined(dbServerPostgres)
@@ -3739,19 +3752,21 @@ testClientServiceIDChange ps@(_, ASType qs _) = do
writeFile testStoreLogFile $ unlines $ filter (not . ("NEW_SERVICE" `isPrefixOf`)) $ lines s
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
liftIO $ threadDelay 250000
subscribeAllConnections service False Nothing
liftIO $ getInAnyOrder service
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult (Just (SMP.SSErrorQueueCount 1 0)) (SMP.ServiceSub _ 0 _)))) -> True; _ -> False,
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False,
\case ("", "", AEvt SAENone (UP _ _)) -> True; _ -> False
\case ("", "", AEvt SAENone (UP _ [_])) -> True; _ -> False
]
subscribeAllConnections user False Nothing
("", "", UP _ [_]) <- nGet user
exchangeGreetingsMsgId 4 service uId user sId
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 1 _)) <- nGet service
("", "", DOWN _ [_]) <- nGet user
pure ()
-- disable service in the client
-- The test uses True for non-existing user to make sure it's removed for user 1,
-- because if no users use services, then it won't be checking them to optimize for most clients.
withAgentClientsServers2 (agentCfg, initAgentServers {useServices = M.fromList [(100, True)]}) (agentCfg, initAgentServers) $ \notService user -> do
withAgentClientsServers2 (agentCfg, initAgentServers) (agentCfg, initAgentServers) $ \notService user -> do
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
subscribeAllConnections notService False Nothing
("", "", UP _ [_]) <- nGet notService
@@ -3759,6 +3774,153 @@ testClientServiceIDChange ps@(_, ASType qs _) = do
("", "", UP _ [_]) <- nGet user
exchangeGreetingsMsgId 6 notService uId user sId
testMigrateConnectionsToService :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
testMigrateConnectionsToService ps = do
(((sId1, uId1), (uId2, sId2)), ((sId3, uId3), (uId4, sId4)), ((sId5, uId5), (uId6, sId6))) <-
withSmpServerStoreLogOn ps testPort $ \_ -> do
-- starting without service
cs12@((sId1, uId1), (uId2, sId2)) <-
withAgentClientsServers2 (agentCfg, initAgentServers) (agentCfg, initAgentServers) $ \notService user ->
runRight $ (,) <$> makeConnection notService user <*> makeConnection user notService
-- migrating to service
cs34@((sId3, uId3), (uId4, sId4)) <-
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> runRight $ do
subscribeAllConnections service False Nothing
service `up` 2
subscribeAllConnections user False Nothing
user `up` 2
exchangeGreetingsMsgId 2 service uId1 user sId1
exchangeGreetingsMsgId 2 service uId2 user sId2
(,) <$> makeConnection service user <*> makeConnection user service
-- starting as service
cs56 <-
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> runRight $ do
subscribeAllConnections service False Nothing
liftIO $ getInAnyOrder service
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 4 _)))) -> True; _ -> False,
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
]
subscribeAllConnections user False Nothing
user `up` 4
exchangeGreetingsMsgId 4 service uId1 user sId1
exchangeGreetingsMsgId 4 service uId2 user sId2
exchangeGreetingsMsgId 2 service uId3 user sId3
exchangeGreetingsMsgId 2 service uId4 user sId4
(,) <$> makeConnection service user <*> makeConnection user service
pure (cs12, cs34, cs56)
-- server reconnecting resubscribes service
let testSendMessages6 s u n = do
exchangeGreetingsMsgId (n + 4) s uId1 u sId1
exchangeGreetingsMsgId (n + 4) s uId2 u sId2
exchangeGreetingsMsgId (n + 2) s uId3 u sId3
exchangeGreetingsMsgId (n + 2) s uId4 u sId4
exchangeGreetingsMsgId n s uId5 u sId5
exchangeGreetingsMsgId n s uId6 u sId6
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
subscribeAllConnections service False Nothing
liftIO $ getInAnyOrder service
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 6 _)))) -> True; _ -> False,
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
]
subscribeAllConnections user False Nothing
user `up` 6
testSendMessages6 service user 2
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 6 _)) <- nGet service
user `down` 6
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
liftIO $ getInAnyOrder service
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 6 _)))) -> True; _ -> False,
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
]
user `up` 6
testSendMessages6 service user 4
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 6 _)) <- nGet service
user `down` 6
-- disabling service and adding connections
((sId7, uId7), (uId8, sId8)) <-
withAgentClientsServers2 (agentCfg, initAgentServers) (agentCfg, initAgentServers) $ \notService user -> do
cs78@((sId7, uId7), (uId8, sId8)) <-
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
subscribeAllConnections notService False Nothing
notService `up` 6
subscribeAllConnections user False Nothing
user `up` 6
testSendMessages6 notService user 6
(,) <$> makeConnection notService user <*> makeConnection user notService
notService `down` 8
user `down` 8
withSmpServerStoreLogOn ps testPort $ \_ -> runRight $ do
notService `up` 8
user `up` 8
testSendMessages6 notService user 8
exchangeGreetingsMsgId 2 notService uId7 user sId7
exchangeGreetingsMsgId 2 notService uId8 user sId8
notService `down` 8
user `down` 8
pure cs78
let testSendMessages8 s u n = do
testSendMessages6 s u (n + 8)
exchangeGreetingsMsgId (n + 2) s uId7 u sId7
exchangeGreetingsMsgId (n + 2) s uId8 u sId8
-- re-enabling service and adding connections
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
subscribeAllConnections service False Nothing
service `up` 8
subscribeAllConnections user False Nothing
user `up` 8
testSendMessages8 service user 2
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 8 _)) <- nGet service
user `down` 8
-- re-connect to server
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
liftIO $ getInAnyOrder service
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 8 _)))) -> True; _ -> False,
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
]
user `up` 8
testSendMessages8 service user 4
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ _ _)) <- nGet service -- should be 8 here
user `down` 8
-- restart agents
withAgentClientsServers2 (agentCfg, initAgentServersClientService) (agentCfg, initAgentServers) $ \service user -> do
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
subscribeAllConnections service False Nothing
liftIO $ getInAnyOrder service
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 8 _)))) -> True; _ -> False,
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
]
subscribeAllConnections user False Nothing
user `up` 8
testSendMessages8 service user 6
("", "", SERVICE_DOWN _ (SMP.ServiceSub _ 8 _)) <- nGet service
user `down` 8
runRight_ $ do
void $ sendMessage user sId7 SMP.noMsgFlags "hello 1"
void $ sendMessage user sId8 SMP.noMsgFlags "hello 2"
-- re-connect to server
withSmpServerStoreLogOn ps testPort $ \_ -> runRight_ $ do
liftIO $ getInAnyOrder service
[ \case ("", "", AEvt SAENone (SERVICE_UP _ (SMP.ServiceSubResult Nothing (SMP.ServiceSub _ 8 _)))) -> True; _ -> False,
\case ("", c, AEvt SAEConn (Msg "hello 1")) -> c == uId7; _ -> False,
\case ("", c, AEvt SAEConn (Msg "hello 2")) -> c == uId8; _ -> False,
\case ("", "", AEvt SAENone (SERVICE_ALL _)) -> True; _ -> False
]
liftIO $ getInAnyOrder user
[ \case ("", "", AEvt SAENone (UP _ [_, _, _, _, _, _, _, _])) -> True; _ -> False,
\case ("", c, AEvt SAEConn (SENT 10)) -> c == sId7; _ -> False,
\case ("", c, AEvt SAEConn (SENT 10)) -> c == sId8; _ -> False
]
testSendMessages6 service user 16
where
up c n = do
("", "", UP _ conns) <- nGet c
liftIO $ length conns `shouldBe` n
down c n = do
("", "", DOWN _ conns) <- nGet c
liftIO $ length conns `shouldBe` n
getSMPAgentClient' :: Int -> AgentConfig -> InitialAgentServers -> String -> IO AgentClient
getSMPAgentClient' clientId cfg' initServers dbPath = do
Right st <- liftIO $ createStore dbPath

View File

@@ -69,21 +69,21 @@ testSessionSubs = do
atomically (SS.hasPendingSub tSess1 (rcvId q4) ss) `shouldReturn` False
atomically (SS.hasActiveSub tSess1 (rcvId q4) ss) `shouldReturn` False
-- setting active queue without setting session ID would keep it as pending
atomically $ SS.addActiveSub' tSess1 "123" q1 False ss
atomically $ SS.addActiveSub' tSess1 "123" Nothing q1 False ss
atomically (SS.hasPendingSub tSess1 (rcvId q1) ss) `shouldReturn` True
atomically (SS.hasActiveSub tSess1 (rcvId q1) ss) `shouldReturn` False
dumpSessionSubs ss `shouldReturn` st
countSubs ss `shouldReturn` (0, 3)
-- setting active queues
atomically $ SS.setSessionId tSess1 "123" ss
atomically $ SS.addActiveSub' tSess1 "123" q1 False ss
atomically $ SS.addActiveSub' tSess1 "123" Nothing q1 False ss
atomically (SS.hasPendingSub tSess1 (rcvId q1) ss) `shouldReturn` False
atomically (SS.hasActiveSub tSess1 (rcvId q1) ss) `shouldReturn` True
atomically (SS.getActiveSubs tSess1 ss) `shouldReturn` M.fromList [("r1", q1)]
atomically (SS.getPendingSubs tSess1 ss) `shouldReturn` (M.fromList [("r2", q2)], Nothing)
countSubs ss `shouldReturn` (1, 2)
atomically $ SS.setSessionId tSess2 "456" ss
atomically $ SS.addActiveSub' tSess2 "456" q4 False ss
atomically $ SS.addActiveSub' tSess2 "456" Nothing q4 False ss
atomically (SS.hasPendingSub tSess2 (rcvId q4) ss) `shouldReturn` False
atomically (SS.hasActiveSub tSess2 (rcvId q4) ss) `shouldReturn` True
atomically (SS.hasActiveSub tSess1 (rcvId q4) ss) `shouldReturn` False -- wrong transport session

View File

@@ -1334,7 +1334,7 @@ testMessageServiceNotifications =
Resp "4" _ (SOK (Just serviceId')) <- serviceSignSendRecv nh2 nKey servicePK ("4", nId, NSUB)
serviceId' `shouldBe` serviceId
-- service subscription is terminated
Resp "" serviceId2 (ENDS 1) <- tGet1 nh1
Resp "" serviceId2 (ENDS 1 _) <- tGet1 nh1
serviceId2 `shouldBe` serviceId
deliverMessage rh rId rKey sh sId sKey nh2 "hello again" dec
1000 `timeout` tGetClient @SMPVersion @ErrorType @BrokerMsg nh1 >>= \case
@@ -1374,7 +1374,7 @@ testMessageServiceNotifications =
Resp "12" serviceId5 (SOKS 2 idsHash') <- signSendRecv nh1 (C.APrivateAuthKey C.SEd25519 servicePK) ("12", serviceId, NSUBS 2 idsHash)
idsHash' `shouldBe` idsHash
serviceId5 `shouldBe` serviceId
Resp "" serviceId6 (ENDS 2) <- tGet1 nh2
Resp "" serviceId6 (ENDS 2 _) <- tGet1 nh2
serviceId6 `shouldBe` serviceId
deliverMessage rh rId rKey sh sId sKey nh1 "connection 1 one more" dec
deliverMessage rh rId'' rKey'' sh sId'' sKey'' nh1 "connection 2 one more" dec''