smp server: messaging services (#1565)

* smp server: refactor message delivery to always respond SOK to subscriptions

* refactor ntf subscribe

* cancel subscription thread and reduce service subscription count when queue is deleted

* subscribe rcv service, deliver sent messages to subscribed service

* subscribe rcv service to messages (TODO delivery on subscription)

* WIP

* efficient initial delivery of messages to subscribed service

* test: delivery to client with service certificate

* test: upgrade/downgrade to/from service subscriptions

* remove service association from agent API, add per-user flag to use the service

* agent client (WIP)

* service certificates in the client

* rfc about drift detection, and SALL to mark end of message delivery

* fix test

* fix test

* add function for postgresql message storage

* update migration
This commit is contained in:
Evgeny
2025-11-07 21:36:28 +00:00
committed by GitHub
parent 3016b929b4
commit 1ca4677b28
31 changed files with 969 additions and 305 deletions
+87 -63
View File
@@ -47,6 +47,7 @@ module Simplex.Messaging.Agent
withInvLock,
createUser,
deleteUser,
setUserService,
connRequestPQSupport,
createConnectionAsync,
joinConnectionAsync,
@@ -78,7 +79,7 @@ module Simplex.Messaging.Agent
getNotificationConns,
resubscribeConnection,
resubscribeConnections,
subscribeClientService,
subscribeClientServices,
sendMessage,
sendMessages,
sendMessagesB,
@@ -210,6 +211,7 @@ import Simplex.Messaging.Protocol
ErrorType (AUTH),
MsgBody,
MsgFlags (..),
IdsHash,
NtfServer,
ProtoServerWithAuth (..),
ProtocolServer (..),
@@ -340,6 +342,11 @@ deleteUser :: AgentClient -> UserId -> Bool -> AE ()
deleteUser c = withAgentEnv c .: deleteUser' c
{-# INLINE deleteUser #-}
-- | Enable using service certificate for this user
setUserService :: AgentClient -> UserId -> Bool -> AE ()
setUserService c = withAgentEnv c .: setUserService' c
{-# INLINE setUserService #-}
-- | Create SMP agent connection (NEW command) asynchronously, synchronous response is new connection id
createConnectionAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AE ConnId
createConnectionAsync c userId aCorrId enableNtfs = withAgentEnv c .:. newConnAsync c userId aCorrId enableNtfs
@@ -381,7 +388,7 @@ deleteConnectionsAsync c waitDelivery = withAgentEnv c . deleteConnectionsAsync'
{-# INLINE deleteConnectionsAsync #-}
-- | Create SMP agent connection (NEW command)
createConnection :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> Bool -> SConnectionMode c -> Maybe (UserConnLinkData c) -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AE (ConnId, (CreatedConnLink c, Maybe ClientServiceId))
createConnection :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> Bool -> SConnectionMode c -> Maybe (UserConnLinkData c) -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AE (ConnId, CreatedConnLink c)
createConnection c nm userId enableNtfs checkNotices = withAgentEnv c .::. newConn c nm userId enableNtfs checkNotices
{-# INLINE createConnection #-}
@@ -424,7 +431,7 @@ prepareConnectionToAccept c userId enableNtfs = withAgentEnv c .: newConnToAccep
{-# INLINE prepareConnectionToAccept #-}
-- | Join SMP agent connection (JOIN command).
joinConnection :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE (SndQueueSecured, Maybe ClientServiceId)
joinConnection :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE SndQueueSecured
joinConnection c nm userId connId enableNtfs = withAgentEnv c .:: joinConn c nm userId connId enableNtfs
{-# INLINE joinConnection #-}
@@ -434,7 +441,7 @@ allowConnection c = withAgentEnv c .:. allowConnection' c
{-# INLINE allowConnection #-}
-- | Accept contact after REQ notification (ACPT command)
acceptContact :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> ConfirmationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AE (SndQueueSecured, Maybe ClientServiceId)
acceptContact :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> ConfirmationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AE SndQueueSecured
acceptContact c userId connId enableNtfs = withAgentEnv c .::. acceptContact' c userId connId enableNtfs
{-# INLINE acceptContact #-}
@@ -462,12 +469,12 @@ syncConnections c = withAgentEnv c .: syncConnections' c
{-# INLINE syncConnections #-}
-- | Subscribe to receive connection messages (SUB command)
subscribeConnection :: AgentClient -> ConnId -> AE (Maybe ClientServiceId)
subscribeConnection :: AgentClient -> ConnId -> AE ()
subscribeConnection c = withAgentEnv c . subscribeConnection' c
{-# INLINE subscribeConnection #-}
-- | Subscribe to receive connection messages from multiple connections, batching commands when possible
subscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either AgentErrorType (Maybe ClientServiceId)))
subscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either AgentErrorType ()))
subscribeConnections c = withAgentEnv c . subscribeConnections' c
{-# INLINE subscribeConnections #-}
@@ -485,18 +492,17 @@ getNotificationConns :: AgentClient -> C.CbNonce -> ByteString -> AE (NonEmpty N
getNotificationConns c = withAgentEnv c .: getNotificationConns' c
{-# INLINE getNotificationConns #-}
resubscribeConnection :: AgentClient -> ConnId -> AE (Maybe ClientServiceId)
resubscribeConnection :: AgentClient -> ConnId -> AE ()
resubscribeConnection c = withAgentEnv c . resubscribeConnection' c
{-# INLINE resubscribeConnection #-}
resubscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either AgentErrorType (Maybe ClientServiceId)))
resubscribeConnections :: AgentClient -> [ConnId] -> AE (Map ConnId (Either AgentErrorType ()))
resubscribeConnections c = withAgentEnv c . resubscribeConnections' c
{-# INLINE resubscribeConnections #-}
-- TODO [certs rcv] how to communicate that service ID changed - as error or as result?
subscribeClientService :: AgentClient -> ClientServiceId -> AE Int
subscribeClientService c = withAgentEnv c . subscribeClientService' c
{-# INLINE subscribeClientService #-}
subscribeClientServices :: AgentClient -> UserId -> AE (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
subscribeClientServices c = withAgentEnv c . subscribeClientServices' c
{-# INLINE subscribeClientServices #-}
-- | Send message to the connection (SEND command)
sendMessage :: AgentClient -> ConnId -> PQEncryption -> MsgFlags -> MsgBody -> AE (AgentMsgId, PQEncryption)
@@ -746,6 +752,7 @@ createUser' c smp xftp = do
userId <- withStore' c createUserRecord
atomically $ TM.insert userId (mkUserServers smp) $ smpServers c
atomically $ TM.insert userId (mkUserServers xftp) $ xftpServers c
atomically $ TM.insert userId False $ useClientServices c
pure userId
deleteUser' :: AgentClient -> UserId -> Bool -> AM ()
@@ -755,6 +762,7 @@ deleteUser' c@AgentClient {smpServersStats, xftpServersStats} userId delSMPQueue
else withStore c (`deleteUserRecord` userId)
atomically $ TM.delete userId $ smpServers c
atomically $ TM.delete userId $ xftpServers c
atomically $ TM.delete userId $ useClientServices c
atomically $ modifyTVar' smpServersStats $ M.filterWithKey (\(userId', _) _ -> userId' /= userId)
atomically $ modifyTVar' xftpServersStats $ M.filterWithKey (\(userId', _) _ -> userId' /= userId)
lift $ saveServersStats c
@@ -763,6 +771,13 @@ deleteUser' c@AgentClient {smpServersStats, xftpServersStats} userId delSMPQueue
whenM (withStore' c (`deleteUserWithoutConns` userId)) . atomically $
writeTBQueue (subQ c) ("", "", AEvt SAENone $ DEL_USER userId)
setUserService' :: AgentClient -> UserId -> Bool -> AM ()
setUserService' c userId enable = do
wasEnabled <- liftIO $ fromMaybe False <$> TM.lookupIO userId (useClientServices c)
when (enable /= wasEnabled) $ do
atomically $ TM.insert userId enable $ useClientServices c
unless enable $ withStore' c (`deleteClientServices` userId)
newConnAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> Bool -> SConnectionMode c -> CR.InitialKeys -> SubscriptionMode -> AM ConnId
newConnAsync c userId corrId enableNtfs cMode pqInitKeys subMode = do
connId <- newConnNoQueues c userId enableNtfs cMode (CR.connPQEncryption pqInitKeys)
@@ -865,7 +880,7 @@ switchConnectionAsync' c corrId connId =
connectionStats c $ DuplexConnection cData rqs' sqs
_ -> throwE $ CMD PROHIBITED "switchConnectionAsync: not duplex"
newConn :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> Bool -> SConnectionMode c -> Maybe (UserConnLinkData c) -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AM (ConnId, (CreatedConnLink c, Maybe ClientServiceId))
newConn :: ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> Bool -> Bool -> SConnectionMode c -> Maybe (UserConnLinkData c) -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> AM (ConnId, CreatedConnLink c)
newConn c nm userId enableNtfs checkNotices cMode linkData_ clientData pqInitKeys subMode = do
srv <- getSMPServer c userId
when (checkNotices && connMode cMode == CMContact) $ checkClientNotices c srv
@@ -989,7 +1004,7 @@ changeConnectionUser' c oldUserId connId newUserId = do
where
updateConn = withStore' c $ \db -> setConnUserId db oldUserId connId newUserId
newRcvConnSrv :: forall c. ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> SConnectionMode c -> Maybe (UserConnLinkData c) -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> SMPServerWithAuth -> AM (CreatedConnLink c, Maybe ClientServiceId)
newRcvConnSrv :: forall c. ConnectionModeI c => AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> SConnectionMode c -> Maybe (UserConnLinkData c) -> Maybe CRClientData -> CR.InitialKeys -> SubscriptionMode -> SMPServerWithAuth -> AM (CreatedConnLink c)
newRcvConnSrv c nm userId connId enableNtfs cMode userLinkData_ clientData pqInitKeys subMode srvWithAuth@(ProtoServerWithAuth srv _) = do
case (cMode, pqInitKeys) of
(SCMContact, CR.IKUsePQ) -> throwE $ CMD PROHIBITED "newRcvConnSrv"
@@ -1000,12 +1015,12 @@ newRcvConnSrv c nm userId connId enableNtfs cMode userLinkData_ clientData pqIni
(nonce, qUri, cReq, qd) <- prepareLinkData d $ fst e2eKeys
(rq, qUri') <- createRcvQueue (Just nonce) qd e2eKeys
ccLink <- connReqWithShortLink qUri cReq qUri' (shortLink rq)
pure (ccLink, clientServiceId rq)
pure ccLink
Nothing -> do
let qd = case cMode of SCMContact -> CQRContact Nothing; SCMInvitation -> CQRMessaging Nothing
(rq, qUri) <- createRcvQueue Nothing qd e2eKeys
(_rq, qUri) <- createRcvQueue Nothing qd e2eKeys
cReq <- createConnReq qUri
pure (CCLink cReq Nothing, clientServiceId rq)
pure $ CCLink cReq Nothing
where
createRcvQueue :: Maybe C.CbNonce -> ClntQueueReqData -> C.KeyPairX25519 -> AM (RcvQueue, SMPQueueUri)
createRcvQueue nonce_ qd e2eKeys = do
@@ -1107,7 +1122,7 @@ newConnToAccept c userId connId enableNtfs invId pqSup = do
Invitation {connReq} <- withStore c $ \db -> getInvitation db "newConnToAccept" invId
newConnToJoin c userId connId enableNtfs connReq pqSup
joinConn :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM (SndQueueSecured, Maybe ClientServiceId)
joinConn :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM SndQueueSecured
joinConn c nm userId connId enableNtfs cReq cInfo pqSupport subMode = do
srv <- getNextSMPServer c userId [qServer $ connReqQueue cReq]
joinConnSrv c nm userId connId enableNtfs cReq cInfo pqSupport subMode srv
@@ -1187,7 +1202,7 @@ versionPQSupport_ :: VersionSMPA -> Maybe CR.VersionE2E -> PQSupport
versionPQSupport_ agentV e2eV_ = PQSupport $ agentV >= pqdrSMPAgentVersion && maybe True (>= CR.pqRatchetE2EEncryptVersion) e2eV_
{-# INLINE versionPQSupport_ #-}
joinConnSrv :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> SMPServerWithAuth -> AM (SndQueueSecured, Maybe ClientServiceId)
joinConnSrv :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> SMPServerWithAuth -> AM SndQueueSecured
joinConnSrv c nm userId connId enableNtfs inv@CRInvitationUri {} cInfo pqSup subMode srv =
withInvLock c (strEncode inv) "joinConnSrv" $ do
SomeConn cType conn <- withStore c (`getConn` connId)
@@ -1198,7 +1213,7 @@ joinConnSrv c nm userId connId enableNtfs inv@CRInvitationUri {} cInfo pqSup sub
| sqStatus == New || sqStatus == Secured -> doJoin (Just rq) (Just sq)
_ -> throwE $ CMD PROHIBITED $ "joinConnSrv: bad connection " <> show cType
where
doJoin :: Maybe RcvQueue -> Maybe SndQueue -> AM (SndQueueSecured, Maybe ClientServiceId)
doJoin :: Maybe RcvQueue -> Maybe SndQueue -> AM SndQueueSecured
doJoin rq_ sq_ = do
(cData, sq, e2eSndParams, lnkId_) <- startJoinInvitation c userId connId sq_ enableNtfs inv pqSup
secureConfirmQueue c nm cData rq_ sq srv cInfo (Just e2eSndParams) subMode
@@ -1209,14 +1224,14 @@ joinConnSrv c nm userId connId enableNtfs cReqUri@CRContactUri {} cInfo pqSup su
withInvLock c (strEncode cReqUri) "joinConnSrv" $ do
SomeConn cType conn <- withStore c (`getConn` connId)
let pqInitKeys = CR.joinContactInitialKeys (v >= pqdrSMPAgentVersion) pqSup
(CCLink cReq _, service) <- case conn of
CCLink cReq _ <- case conn of
NewConnection _ -> newRcvConnSrv c NRMBackground userId connId enableNtfs SCMInvitation Nothing Nothing pqInitKeys subMode srv
RcvConnection _ rq -> mkJoinInvitation rq pqInitKeys
_ -> throwE $ CMD PROHIBITED $ "joinConnSrv: bad connection " <> show cType
void $ sendInvitation c nm userId connId qInfo vrsn cReq cInfo
pure (False, service)
pure False
where
mkJoinInvitation rq@RcvQueue {clientService} pqInitKeys = do
mkJoinInvitation rq pqInitKeys = do
g <- asks random
AgentConfig {smpClientVRange = vr, smpAgentVRange, e2eEncryptVRange = e2eVR} <- asks config
let qUri = SMPQueueUri vr $ (rcvSMPQueueAddress rq) {queueMode = Just QMMessaging}
@@ -1231,7 +1246,7 @@ joinConnSrv c nm userId connId enableNtfs cReqUri@CRContactUri {} cInfo pqSup su
createRatchetX3dhKeys db connId pk1 pk2 pKem
pure e2eRcvParams
let cReq = CRInvitationUri crData $ toVersionRangeT e2eRcvParams e2eVR
pure (CCLink cReq Nothing, dbServiceId <$> clientService)
pure $ CCLink cReq Nothing
Nothing -> throwE $ AGENT A_VERSION
delInvSL :: AgentClient -> ConnId -> SMPServerWithAuth -> SMP.LinkId -> AM ()
@@ -1239,7 +1254,7 @@ delInvSL c connId srv lnkId =
withStore' c (\db -> deleteInvShortLink db (protoServer srv) lnkId) `catchE` \e ->
liftIO $ nonBlockingWriteTBQueue (subQ c) ("", connId, AEvt SAEConn (ERR $ INTERNAL $ "error deleting short link " <> show e))
joinConnSrvAsync :: AgentClient -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> SMPServerWithAuth -> AM (SndQueueSecured, Maybe ClientServiceId)
joinConnSrvAsync :: AgentClient -> UserId -> ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> SMPServerWithAuth -> AM SndQueueSecured
joinConnSrvAsync c userId connId enableNtfs inv@CRInvitationUri {} cInfo pqSupport subMode srv = do
SomeConn cType conn <- withStore c (`getConn` connId)
case conn of
@@ -1251,7 +1266,7 @@ joinConnSrvAsync c userId connId enableNtfs inv@CRInvitationUri {} cInfo pqSuppo
| sqStatus == New || sqStatus == Secured -> doJoin (Just rq) (Just sq)
_ -> throwE $ CMD PROHIBITED $ "joinConnSrvAsync: bad connection " <> show cType
where
doJoin :: Maybe RcvQueue -> Maybe SndQueue -> AM (SndQueueSecured, Maybe ClientServiceId)
doJoin :: Maybe RcvQueue -> Maybe SndQueue -> AM SndQueueSecured
doJoin rq_ sq_ = do
(cData, sq, e2eSndParams, lnkId_) <- startJoinInvitation c userId connId sq_ enableNtfs inv pqSupport
secureConfirmQueueAsync c cData rq_ sq srv cInfo (Just e2eSndParams) subMode
@@ -1259,7 +1274,7 @@ joinConnSrvAsync c userId connId enableNtfs inv@CRInvitationUri {} cInfo pqSuppo
joinConnSrvAsync _c _userId _connId _enableNtfs (CRContactUri _) _cInfo _subMode _pqSupport _srv = do
throwE $ CMD PROHIBITED "joinConnSrvAsync"
createReplyQueue :: AgentClient -> NetworkRequestMode -> ConnData -> SndQueue -> SubscriptionMode -> SMPServerWithAuth -> AM (SMPQueueInfo, Maybe ClientServiceId)
createReplyQueue :: AgentClient -> NetworkRequestMode -> ConnData -> SndQueue -> SubscriptionMode -> SMPServerWithAuth -> AM SMPQueueInfo
createReplyQueue c nm ConnData {userId, connId, enableNtfs} SndQueue {smpClientVersion} subMode srv = do
ntfServer_ <- if enableNtfs then newQueueNtfServer else pure Nothing
(rq, qUri, tSess, sessId) <- newRcvQueue c nm userId connId srv (versionToRange smpClientVersion) SCMInvitation (isJust ntfServer_) subMode
@@ -1268,7 +1283,7 @@ createReplyQueue c nm ConnData {userId, connId, enableNtfs} SndQueue {smpClientV
rq' <- withStore c $ \db -> upgradeSndConnToDuplex db connId rq subMode
lift . when (subMode == SMSubscribe) $ addNewQueueSubscription c rq' tSess sessId
mapM_ (newQueueNtfSubscription c rq') ntfServer_
pure (qInfo, clientServiceId rq')
pure qInfo
-- | Approve confirmation (LET command) in Reader monad
allowConnection' :: AgentClient -> ConnId -> ConfirmationId -> ConnInfo -> AM ()
@@ -1281,7 +1296,7 @@ allowConnection' c connId confId ownConnInfo = withConnLock c connId "allowConne
_ -> throwE $ CMD PROHIBITED "allowConnection"
-- | Accept contact (ACPT command) in Reader monad
acceptContact' :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> InvitationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AM (SndQueueSecured, Maybe ClientServiceId)
acceptContact' :: AgentClient -> NetworkRequestMode -> UserId -> ConnId -> Bool -> InvitationId -> ConnInfo -> PQSupport -> SubscriptionMode -> AM SndQueueSecured
acceptContact' c nm userId connId enableNtfs invId ownConnInfo pqSupport subMode = withConnLock c connId "acceptContact" $ do
Invitation {connReq} <- withStore c $ \db -> getInvitation db "acceptContact'" invId
r <- joinConn c nm userId connId enableNtfs connReq ownConnInfo pqSupport subMode
@@ -1316,7 +1331,7 @@ databaseDiff passed known =
in DatabaseDiff {missingIds, extraIds}
-- | Subscribe to receive connection messages (SUB command) in Reader monad
subscribeConnection' :: AgentClient -> ConnId -> AM (Maybe ClientServiceId)
subscribeConnection' :: AgentClient -> ConnId -> AM ()
subscribeConnection' c connId = toConnResult connId =<< subscribeConnections' c [connId]
{-# INLINE subscribeConnection' #-}
@@ -1332,12 +1347,13 @@ type QDelResult = QCmdResult ()
type QSubResult = QCmdResult (Maybe SMP.ServiceId)
subscribeConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType (Maybe ClientServiceId)))
subscribeConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType ()))
subscribeConnections' _ [] = pure M.empty
subscribeConnections' c connIds = subscribeConnections_ c . zip connIds =<< withStore' c (`getConnSubs` connIds)
subscribeConnections_ :: AgentClient -> [(ConnId, Either StoreError SomeConnSub)] -> AM (Map ConnId (Either AgentErrorType (Maybe ClientServiceId)))
subscribeConnections_ :: AgentClient -> [(ConnId, Either StoreError SomeConnSub)] -> AM (Map ConnId (Either AgentErrorType ()))
subscribeConnections_ c conns = do
-- TODO [certs rcv] - it should exclude connections already associated, and then if some don't deliver any response they may be unassociated
let (subRs, cs) = foldr partitionResultsConns ([], []) conns
resumeDelivery cs
resumeConnCmds c $ map fst cs
@@ -1351,8 +1367,8 @@ subscribeConnections_ c conns = do
pure rs
where
partitionResultsConns :: (ConnId, Either StoreError SomeConnSub) ->
(Map ConnId (Either AgentErrorType (Maybe ClientServiceId)), [(ConnId, SomeConnSub)]) ->
(Map ConnId (Either AgentErrorType (Maybe ClientServiceId)), [(ConnId, SomeConnSub)])
(Map ConnId (Either AgentErrorType ()), [(ConnId, SomeConnSub)]) ->
(Map ConnId (Either AgentErrorType ()), [(ConnId, SomeConnSub)])
partitionResultsConns (connId, conn_) (rs, cs) = case conn_ of
Left e -> (M.insert connId (Left $ storeError e) rs, cs)
Right c'@(SomeConn _ conn) -> case conn of
@@ -1360,12 +1376,12 @@ subscribeConnections_ c conns = do
SndConnection _ sq -> (M.insert connId (sndSubResult sq) rs, cs')
RcvConnection _ _ -> (rs, cs')
ContactConnection _ _ -> (rs, cs')
NewConnection _ -> (M.insert connId (Right Nothing) rs, cs')
NewConnection _ -> (M.insert connId (Right ()) rs, cs')
where
cs' = (connId, c') : cs
sndSubResult :: SndQueue -> Either AgentErrorType (Maybe ClientServiceId)
sndSubResult :: SndQueue -> Either AgentErrorType ()
sndSubResult SndQueue {status} = case status of
Confirmed -> Right Nothing
Confirmed -> Right ()
Active -> Left $ CONN SIMPLEX "subscribeConnections"
_ -> Left $ INTERNAL "unexpected queue status"
rcvQueues :: (ConnId, SomeConnSub) -> [RcvQueueSub]
@@ -1386,9 +1402,9 @@ subscribeConnections_ c conns = do
order (_, Right _) = 3
order _ = 4
-- TODO [certs rcv] store associations of queues with client service ID
storeClientServiceAssocs :: Map ConnId (Either AgentErrorType (Maybe SMP.ServiceId)) -> AM (Map ConnId (Either AgentErrorType (Maybe ClientServiceId)))
storeClientServiceAssocs = pure . M.map (Nothing <$)
sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType (Maybe ClientServiceId)) -> [(ConnId, SomeConnSub)] -> AM' ()
storeClientServiceAssocs :: Map ConnId (Either AgentErrorType (Maybe SMP.ServiceId)) -> AM (Map ConnId (Either AgentErrorType ()))
storeClientServiceAssocs = pure . M.map (() <$)
sendNtfCreate :: NtfSupervisor -> Map ConnId (Either AgentErrorType ()) -> [(ConnId, SomeConnSub)] -> AM' ()
sendNtfCreate ns rcvRs cs = do
let oks = M.keysSet $ M.filter (either temporaryAgentError $ const True) rcvRs
(csCreate, csDelete) = foldr (groupConnIds oks) ([], []) cs
@@ -1412,7 +1428,7 @@ subscribeConnections_ c conns = do
DuplexConnection _ _ sqs -> L.toList sqs
SndConnection _ sq -> [sq]
_ -> []
notifyResultError :: Map ConnId (Either AgentErrorType (Maybe ClientServiceId)) -> AM ()
notifyResultError :: Map ConnId (Either AgentErrorType ()) -> AM ()
notifyResultError rs = do
let actual = M.size rs
expected = length conns
@@ -1472,15 +1488,15 @@ subscribeAllConnections' c onlyNeeded activeUserId_ = handleErr $ do
sqs <- withStore' c getAllSndQueuesForDelivery
lift $ mapM_ (resumeMsgDelivery c) sqs
resubscribeConnection' :: AgentClient -> ConnId -> AM (Maybe ClientServiceId)
resubscribeConnection' :: AgentClient -> ConnId -> AM ()
resubscribeConnection' c connId = toConnResult connId =<< resubscribeConnections' c [connId]
{-# INLINE resubscribeConnection' #-}
resubscribeConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType (Maybe ClientServiceId)))
resubscribeConnections' :: AgentClient -> [ConnId] -> AM (Map ConnId (Either AgentErrorType ()))
resubscribeConnections' _ [] = pure M.empty
resubscribeConnections' c connIds = do
conns <- zip connIds <$> withStore' c (`getConnSubs` connIds)
let r = M.fromList $ map (,Right Nothing) connIds -- TODO [certs rcv]
let r = M.fromList $ map (,Right ()) connIds
conns' <- filterM (fmap not . isActiveConn . snd) conns
-- union is left-biased, so results returned by subscribeConnections' take precedence
(`M.union` r) <$> subscribeConnections_ c conns'
@@ -1491,9 +1507,15 @@ resubscribeConnections' c connIds = do
[] -> pure True
rqs' -> anyM $ map (atomically . hasActiveSubscription c) rqs'
-- TODO [certs rcv]
subscribeClientService' :: AgentClient -> ClientServiceId -> AM Int
subscribeClientService' = undefined
-- TODO [certs rcv] compare hash with lock
subscribeClientServices' :: AgentClient -> UserId -> AM (Map SMPServer (Either AgentErrorType (Int64, IdsHash)))
subscribeClientServices' c userId =
ifM useService subscribe $ throwError $ CMD PROHIBITED "no user service allowed"
where
useService = liftIO $ (Just True ==) <$> TM.lookupIO userId (useClientServices c)
subscribe = do
srvs <- withStore' c (`getClientServiceServers` userId)
lift $ M.fromList . zip srvs <$> mapConcurrently (tryAllErrors' . subscribeClientService c userId) srvs
-- requesting messages sequentially, to reduce memory usage
getConnectionMessages' :: AgentClient -> NonEmpty ConnMsgReq -> AM' (NonEmpty (Either AgentErrorType (Maybe SMPMsgMeta)))
@@ -1655,13 +1677,13 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
NEW enableNtfs (ACM cMode) pqEnc subMode -> noServer $ do
triedHosts <- newTVarIO S.empty
tryCommand . withNextSrv c userId storageSrvs triedHosts [] $ \srv -> do
(CCLink cReq _, service) <- newRcvConnSrv c NRMBackground userId connId enableNtfs cMode Nothing Nothing pqEnc subMode srv
notify $ INV (ACR cMode cReq) service
CCLink cReq _ <- newRcvConnSrv c NRMBackground userId connId enableNtfs cMode Nothing Nothing pqEnc subMode srv
notify $ INV (ACR cMode cReq)
JOIN enableNtfs (ACR _ cReq@(CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _)) pqEnc subMode connInfo -> noServer $ do
triedHosts <- newTVarIO S.empty
tryCommand . withNextSrv c userId storageSrvs triedHosts [qServer q] $ \srv -> do
(sqSecured, service) <- joinConnSrvAsync c userId connId enableNtfs cReq connInfo pqEnc subMode srv
notify $ JOINED sqSecured service
sqSecured <- joinConnSrvAsync c userId connId enableNtfs cReq connInfo pqEnc subMode srv
notify $ JOINED sqSecured
LET confId ownCInfo -> withServer' . tryCommand $ allowConnection' c connId confId ownCInfo >> notify OK
ACK msgId rcptInfo_ -> withServer' . tryCommand $ ackMessage' c connId msgId rcptInfo_ >> notify OK
SWCH ->
@@ -2818,7 +2840,7 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
SMP.SUB -> case respOrErr of
Right SMP.OK -> liftIO $ processSubOk rq upConnIds
-- TODO [certs rcv] associate queue with the service
Right (SMP.SOK serviceId_) -> liftIO $ processSubOk rq upConnIds
Right (SMP.SOK _serviceId_) -> liftIO $ processSubOk rq upConnIds
Right msg@SMP.MSG {} -> do
liftIO $ processSubOk rq upConnIds -- the connection is UP even when processing this particular message fails
runProcessSMP rq conn (toConnData conn) msg
@@ -3053,7 +3075,9 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
notifyEnd removed
| removed = notify END >> logServer "<--" c srv rId "END"
| otherwise = logServer "<--" c srv rId "END from disconnected client - ignored"
-- Possibly, we need to add some flag to connection that it was deleted
-- TODO [certs rcv]
r@(SMP.ENDS _) -> unexpected r
-- TODO [certs rcv] Possibly, we need to add some flag to connection that it was deleted
SMP.DELD -> atomically (removeSubscription c tSess connId rq) >> notify DELD
SMP.ERR e -> notify $ ERR $ SMP (B.unpack $ strEncode srv) e
r -> unexpected r
@@ -3439,22 +3463,22 @@ connectReplyQueues c cData@ConnData {userId, connId} ownConnInfo sq_ (qInfo :| _
(sq, _) <- lift $ newSndQueue userId connId qInfo' Nothing
withStore c $ \db -> upgradeRcvConnToDuplex db connId sq
secureConfirmQueueAsync :: AgentClient -> ConnData -> Maybe RcvQueue -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> SubscriptionMode -> AM (SndQueueSecured, Maybe ClientServiceId)
secureConfirmQueueAsync :: AgentClient -> ConnData -> Maybe RcvQueue -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> SubscriptionMode -> AM SndQueueSecured
secureConfirmQueueAsync c cData rq_ sq srv connInfo e2eEncryption_ subMode = do
sqSecured <- agentSecureSndQueue c NRMBackground cData sq
(qInfo, service) <- mkAgentConfirmation c NRMBackground cData rq_ sq srv connInfo subMode
qInfo <- mkAgentConfirmation c NRMBackground cData rq_ sq srv connInfo subMode
storeConfirmation c cData sq e2eEncryption_ qInfo
lift $ submitPendingMsg c sq
pure (sqSecured, service)
pure sqSecured
secureConfirmQueue :: AgentClient -> NetworkRequestMode -> ConnData -> Maybe RcvQueue -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> SubscriptionMode -> AM (SndQueueSecured, Maybe ClientServiceId)
secureConfirmQueue :: AgentClient -> NetworkRequestMode -> ConnData -> Maybe RcvQueue -> SndQueue -> SMPServerWithAuth -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> SubscriptionMode -> AM SndQueueSecured
secureConfirmQueue c nm cData@ConnData {connId, connAgentVersion, pqSupport} rq_ sq srv connInfo e2eEncryption_ subMode = do
sqSecured <- agentSecureSndQueue c nm cData sq
(qInfo, service) <- mkAgentConfirmation c nm cData rq_ sq srv connInfo subMode
qInfo <- mkAgentConfirmation c nm cData rq_ sq srv connInfo subMode
msg <- mkConfirmation qInfo
void $ sendConfirmation c nm sq msg
withStore' c $ \db -> setSndQueueStatus db sq Confirmed
pure (sqSecured, service)
pure sqSecured
where
mkConfirmation :: AgentMessage -> AM MsgBody
mkConfirmation aMessage = do
@@ -3480,12 +3504,12 @@ agentSecureSndQueue c nm ConnData {connAgentVersion} sq@SndQueue {queueMode, sta
sndSecure = senderCanSecure queueMode
initiatorRatchetOnConf = connAgentVersion >= ratchetOnConfSMPAgentVersion
mkAgentConfirmation :: AgentClient -> NetworkRequestMode -> ConnData -> Maybe RcvQueue -> SndQueue -> SMPServerWithAuth -> ConnInfo -> SubscriptionMode -> AM (AgentMessage, Maybe ClientServiceId)
mkAgentConfirmation :: AgentClient -> NetworkRequestMode -> ConnData -> Maybe RcvQueue -> SndQueue -> SMPServerWithAuth -> ConnInfo -> SubscriptionMode -> AM AgentMessage
mkAgentConfirmation c nm cData rq_ sq srv connInfo subMode = do
(qInfo, service) <- case rq_ of
qInfo <- case rq_ of
Nothing -> createReplyQueue c nm cData sq subMode srv
Just rq@RcvQueue {smpClientVersion = v, clientService} -> pure (SMPQueueInfo v $ rcvSMPQueueAddress rq, dbServiceId <$> clientService)
pure (AgentConnInfoReply (qInfo :| []) connInfo, service)
Just rq@RcvQueue {smpClientVersion = v} -> pure $ SMPQueueInfo v $ rcvSMPQueueAddress rq
pure $ AgentConnInfoReply (qInfo :| []) connInfo
enqueueConfirmation :: AgentClient -> ConnData -> SndQueue -> ConnInfo -> Maybe (CR.SndE2ERatchetParams 'C.X448) -> AM ()
enqueueConfirmation c cData sq connInfo e2eEncryption_ = do
+55 -11
View File
@@ -49,6 +49,7 @@ module Simplex.Messaging.Agent.Client
newRcvQueue_,
subscribeQueues,
subscribeUserServerQueues,
subscribeClientService,
processClientNotices,
getQueueMessage,
decryptSMPMessage,
@@ -223,6 +224,7 @@ import Data.Text.Encoding
import Data.Time (UTCTime, addUTCTime, defaultTimeLocale, formatTime, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Word (Word16)
import qualified Data.X509.Validation as XV
import Network.Socket (HostName)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
import qualified Simplex.FileTransfer.Client as X
@@ -238,7 +240,7 @@ import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Stats
import Simplex.Messaging.Agent.Store
import Simplex.Messaging.Agent.Store.AgentStore (getClientNotices, updateClientNotices)
import Simplex.Messaging.Agent.Store.AgentStore
import Simplex.Messaging.Agent.Store.Common (DBStore, withTransaction)
import qualified Simplex.Messaging.Agent.Store.DB as DB
import Simplex.Messaging.Agent.Store.Entity
@@ -262,6 +264,7 @@ import Simplex.Messaging.Protocol
NetworkError (..),
MsgFlags (..),
MsgId,
IdsHash,
NtfServer,
NtfServerWithAuth,
ProtoServer,
@@ -296,8 +299,9 @@ import Simplex.Messaging.Session
import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPVersion, SessionId, THandleParams (sessionId, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion, newNtfCredsSMPVersion)
import Simplex.Messaging.Transport (SMPServiceRole (..), SMPVersion, ServiceCredentials (..), SessionId, THClientService' (..), THandleParams (sessionId, thVersion), TransportError (..), TransportPeer (..), sndAuthKeySMPVersion, shortLinksSMPVersion, newNtfCredsSMPVersion)
import Simplex.Messaging.Transport.Client (TransportHost (..))
import Simplex.Messaging.Transport.Credentials
import Simplex.Messaging.Util
import Simplex.Messaging.Version
import System.Mem.Weak (Weak, deRefWeak)
@@ -331,6 +335,7 @@ data AgentClient = AgentClient
msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg),
smpServers :: TMap UserId (UserServers 'PSMP),
smpClients :: TMap SMPTransportSession SMPClientVar,
useClientServices :: TMap UserId Bool,
-- smpProxiedRelays:
-- SMPTransportSession defines connection from proxy to relay,
-- SMPServerWithAuth defines client connected to SMP proxy (with the same userId and entityId in TransportSession)
@@ -495,7 +500,7 @@ data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther
-- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's.
newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Map (Maybe SMPServer) (Maybe SystemSeconds) -> Env -> IO AgentClient
newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomains, presetServers} currentTs notices agentEnv = do
newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices, presetDomains, presetServers} currentTs notices agentEnv = do
let cfg = config agentEnv
qSize = tbqSize cfg
proxySessTs <- newTVarIO =<< getCurrentTime
@@ -505,6 +510,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai
msgQ <- newTBQueueIO qSize
smpServers <- newTVarIO $ M.map mkUserServers smp
smpClients <- TM.emptyIO
useClientServices <- newTVarIO useServices
smpProxiedRelays <- TM.emptyIO
ntfServers <- newTVarIO ntf
ntfClients <- TM.emptyIO
@@ -544,6 +550,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, presetDomai
msgQ,
smpServers,
smpClients,
useClientServices,
smpProxiedRelays,
ntfServers,
ntfClients,
@@ -598,6 +605,28 @@ agentDRG :: AgentClient -> TVar ChaChaDRG
agentDRG AgentClient {agentEnv = Env {random}} = random
{-# INLINE agentDRG #-}
getServiceCredentials :: AgentClient -> UserId -> SMPServer -> AM (Maybe (ServiceCredentials, Maybe ServiceId))
getServiceCredentials c userId srv =
liftIO (TM.lookupIO userId $ useClientServices c)
$>>= \useService -> if useService then Just <$> getService else pure Nothing
where
getService :: AM (ServiceCredentials, Maybe ServiceId)
getService = do
let g = agentDRG c
((C.KeyHash kh, serviceCreds), serviceId_) <-
withStore' c $ \db ->
getClientService db userId srv >>= \case
Just service -> pure service
Nothing -> do
cred <- genCredentials g Nothing (25, 24 * 999999) "simplex"
let tlsCreds = tlsCredentials [cred]
createClientService db userId srv tlsCreds
pure (tlsCreds, Nothing)
(_, pk) <- atomically $ C.generateKeyPair g
let serviceSignKey = C.APrivateSignKey C.SEd25519 pk
creds = ServiceCredentials {serviceRole = SRMessaging, serviceCreds, serviceCertHash = XV.Fingerprint kh, serviceSignKey}
pure (creds, serviceId_)
class (Encoding err, Show err) => ProtocolServerClient v err msg | msg -> v, msg -> err where
type Client msg = c | c -> msg
getProtocolServerClient :: AgentClient -> NetworkRequestMode -> TransportSession msg -> AM (Client msg)
@@ -701,7 +730,7 @@ getSMPProxyClient c@AgentClient {active, smpClients, smpProxiedRelays, workerSeq
Nothing -> Left $ BROKER (B.unpack $ strEncode srv) TIMEOUT
smpConnectClient :: AgentClient -> NetworkRequestMode -> SMPTransportSession -> TMap SMPServer ProxiedRelayVar -> SMPClientVar -> AM SMPConnectedClient
smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm tSess@(_, srv, _) prs v =
smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm tSess@(userId, srv, _) prs v =
newProtocolClient c tSess smpClients connectClient v
`catchAllErrors` \e -> lift (resubscribeSMPSession c tSess) >> throwE e
where
@@ -709,12 +738,22 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm
connectClient v' = do
cfg <- lift $ getClientConfig c smpCfg
g <- asks random
service <- getServiceCredentials c userId srv
let cfg' = cfg {serviceCredentials = fst <$> service}
env <- ask
liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do
smp <- liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do
ts <- readTVarIO proxySessTs
smp <- ExceptT $ getProtocolClient g nm tSess cfg presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs
atomically $ SS.setSessionId tSess (sessionId $ thParams smp) $ currentSubs c
pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs}
ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs
atomically $ SS.setSessionId tSess (sessionId $ thParams smp) $ currentSubs c
updateClientService service smp
pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs}
updateClientService service smp = case (service, smpClientService smp) of
(Just (_, serviceId_), Just THClientService {serviceId})
| serviceId_ /= Just serviceId -> withStore' c $ \db -> setClientServiceId db userId srv serviceId
| otherwise -> pure ()
(Just _, Nothing) -> withStore' c $ \db -> deleteClientService db userId srv -- e.g., server version downgrade
(Nothing, Just _) -> logError "server returned serviceId without service credentials in request"
(Nothing, Nothing) -> pure ()
smpClientDisconnected :: AgentClient -> SMPTransportSession -> Env -> SMPClientVar -> TMap SMPServer ProxiedRelayVar -> SMPClient -> IO ()
smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess@(userId, srv, cId) env v prs client = do
@@ -862,7 +901,6 @@ waitForProtocolClient c nm tSess@(_, srv, _) clients v = do
(throwE e)
Nothing -> throwE $ BROKER (B.unpack $ strEncode srv) TIMEOUT
-- clientConnected arg is only passed for SMP server
newProtocolClient ::
forall v err msg.
(ProtocolTypeI (ProtoType msg), ProtocolServerClient v err msg) =>
@@ -1399,7 +1437,8 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
withClient c nm tSess $ \(SMPConnectedClient smp _) -> do
(ntfKeys, ntfCreds) <- liftIO $ mkNtfCreds a g smp
(thParams smp,ntfKeys,) <$> createSMPQueue smp nm nonce_ rKeys dhKey auth subMode (queueReqData cqrd) ntfCreds
-- TODO [certs rcv] validate that serviceId is the same as in the client session
-- TODO [certs rcv] validate that serviceId is the same as in the client session, fail otherwise
-- possibly, it should allow returning Nothing - it would indicate incorrect old version
liftIO . logServer "<--" c srv NoEntity $ B.unwords ["IDS", logSecret rcvId, logSecret sndId]
shortLink <- mkShortLinkCreds thParams' qik
let rq =
@@ -1415,7 +1454,7 @@ newRcvQueue_ c nm userId connId (ProtoServerWithAuth srv auth) vRange cqrd enabl
sndId,
queueMode,
shortLink,
clientService = ClientService DBNewEntity <$> serviceId,
rcvServiceAssoc = isJust serviceId,
status = New,
enableNtfs,
clientNoticeId = Nothing,
@@ -1650,6 +1689,11 @@ processClientNotices c@AgentClient {presetServers} tSess notices = do
logError $ "processClientNotices error: " <> tshow e
notifySub' c "" $ ERR e
subscribeClientService :: AgentClient -> UserId -> SMPServer -> AM (Int64, IdsHash)
subscribeClientService c userId srv =
withLogClient c NRMBackground (userId, srv, Nothing) B.empty "SUBS" $
(`subscribeService` SMP.SRecipientService) . connectedClient
activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool
activeClientSession c tSess sessId = sameSess <$> tryReadSessVar tSess (smpClients c)
where
@@ -90,6 +90,7 @@ data InitialAgentServers = InitialAgentServers
ntf :: [NtfServer],
xftp :: Map UserId (NonEmpty (ServerCfg 'PXFTP)),
netCfg :: NetworkConfig,
useServices :: Map UserId Bool,
presetDomains :: [HostName],
presetServers :: [SMPServer]
}
+2 -16
View File
@@ -126,9 +126,6 @@ module Simplex.Messaging.Agent.Protocol
ContactConnType (..),
ShortLinkScheme (..),
LinkKey (..),
StoredClientService (..),
ClientService,
ClientServiceId,
sameConnReqContact,
sameShortLinkContact,
simplexChat,
@@ -212,7 +209,6 @@ import Simplex.FileTransfer.Transport (XFTPErrorType)
import Simplex.FileTransfer.Types (FileErrorType)
import Simplex.Messaging.Agent.QueryString
import Simplex.Messaging.Agent.Store.DB (Binary (..), FromField (..), ToField (..), blobFieldDecoder, fromTextField_)
import Simplex.Messaging.Agent.Store.Entity
import Simplex.Messaging.Client (ProxyClientError)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet
@@ -381,7 +377,7 @@ type SndQueueSecured = Bool
-- | Parameterized type for SMP agent events
data AEvent (e :: AEntity) where
INV :: AConnectionRequestUri -> Maybe ClientServiceId -> AEvent AEConn
INV :: AConnectionRequestUri -> AEvent AEConn
CONF :: ConfirmationId -> PQSupport -> [SMPServer] -> ConnInfo -> AEvent AEConn -- ConnInfo is from sender, [SMPServer] will be empty only in v1 handshake
REQ :: InvitationId -> PQSupport -> NonEmpty SMPServer -> ConnInfo -> AEvent AEConn -- ConnInfo is from sender
INFO :: PQSupport -> ConnInfo -> AEvent AEConn
@@ -407,7 +403,7 @@ data AEvent (e :: AEntity) where
DEL_USER :: Int64 -> AEvent AENone
STAT :: ConnectionStats -> AEvent AEConn
OK :: AEvent AEConn
JOINED :: SndQueueSecured -> Maybe ClientServiceId -> AEvent AEConn
JOINED :: SndQueueSecured -> AEvent AEConn
ERR :: AgentErrorType -> AEvent AEConn
ERRS :: NonEmpty (ConnId, AgentErrorType) -> AEvent AENone
SUSPENDED :: AEvent AENone
@@ -1783,16 +1779,6 @@ instance Encoding UserLinkData where
smpP = UserLinkData <$> ((A.char '\255' *> (unLarge <$> smpP)) <|> smpP)
{-# INLINE smpP #-}
data StoredClientService (s :: DBStored) = ClientService
{ dbServiceId :: DBEntityId' s,
serviceId :: SMP.ServiceId
}
deriving (Eq, Show)
type ClientService = StoredClientService 'DBStored
type ClientServiceId = DBEntityId
-- | SMP queue status.
data QueueStatus
= -- | queue is created
+2 -4
View File
@@ -85,7 +85,7 @@ data StoredRcvQueue (q :: DBStored) = RcvQueue
-- | short link ID and credentials
shortLink :: Maybe ShortLinkCreds,
-- | associated client service
clientService :: Maybe (StoredClientService q),
rcvServiceAssoc :: ServiceAssoc,
-- | queue status
status :: QueueStatus,
-- | to enable notifications for this queue - this field is duplicated from ConnData
@@ -134,9 +134,7 @@ data ShortLinkCreds = ShortLinkCreds
}
deriving (Show)
clientServiceId :: RcvQueue -> Maybe ClientServiceId
clientServiceId = fmap dbServiceId . clientService
{-# INLINE clientServiceId #-}
type ServiceAssoc = Bool
rcvSMPQueueAddress :: RcvQueue -> SMPQueueAddress
rcvSMPQueueAddress RcvQueue {server, sndId, e2ePrivKey, queueMode} =
@@ -35,6 +35,14 @@ module Simplex.Messaging.Agent.Store.AgentStore
deleteUsersWithoutConns,
checkUser,
-- * Client services
createClientService,
getClientService,
getClientServiceServers,
setClientServiceId,
deleteClientService,
deleteClientServices,
-- * Queues and connections
createNewConn,
updateNewConnRcv,
@@ -274,7 +282,9 @@ import qualified Data.Set as S
import Data.Text.Encoding (decodeLatin1, encodeUtf8)
import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, getCurrentTime)
import Data.Word (Word32)
import qualified Data.X509 as X
import Network.Socket (ServiceName)
import qualified Network.TLS as TLS
import Simplex.FileTransfer.Client (XFTPChunkSpec (..))
import Simplex.FileTransfer.Description
import Simplex.FileTransfer.Protocol (FileParty (..), SFileParty (..))
@@ -390,6 +400,75 @@ deleteUsersWithoutConns db = do
forM_ userIds $ DB.execute db "DELETE FROM users WHERE user_id = ?" . Only
pure userIds
createClientService :: DB.Connection -> UserId -> SMPServer -> (C.KeyHash, TLS.Credential) -> IO ()
createClientService db userId srv (kh, (cert, pk)) =
DB.execute
db
[sql|
INSERT INTO client_services
(user_id, host, port, service_cert_hash, service_cert, service_priv_key)
VALUES (?,?,?,?,?,?)
ON CONFLICT (user_id, host, port)
DO UPDATE SET
service_cert_hash = EXCLUDED.service_cert_hash,
service_cert = EXCLUDED.service_cert,
service_priv_key = EXCLUDED.service_priv_key,
rcv_service_id = NULL
|]
(userId, host srv, port srv, kh, cert, pk)
getClientService :: DB.Connection -> UserId -> SMPServer -> IO (Maybe ((C.KeyHash, TLS.Credential), Maybe ServiceId))
getClientService db userId srv =
maybeFirstRow toService $
DB.query
db
[sql|
SELECT service_cert_hash, service_cert, service_priv_key, rcv_service_id
FROM client_services
WHERE user_id = ? AND host = ? AND port = ?
|]
(userId, host srv, port srv)
where
toService (kh, cert, pk, serviceId_) = ((kh, (cert, pk)), serviceId_)
getClientServiceServers :: DB.Connection -> UserId -> IO [SMPServer]
getClientServiceServers db userId =
map toServer
<$> DB.query
db
[sql|
SELECT c.host, c.port, s.key_hash
FROM client_services c
JOIN servers s ON s.host = c.host AND s.port = c.port
|]
(Only userId)
where
toServer (host, port, kh) = SMPServer host port kh
setClientServiceId :: DB.Connection -> UserId -> SMPServer -> ServiceId -> IO ()
setClientServiceId db userId srv serviceId =
DB.execute
db
[sql|
UPDATE client_services
SET rcv_service_id = ?
WHERE user_id = ? AND host = ? AND port = ?
|]
(serviceId, userId, host srv, port srv)
deleteClientService :: DB.Connection -> UserId -> SMPServer -> IO ()
deleteClientService db userId srv =
DB.execute
db
[sql|
DELETE FROM client_services
WHERE user_id = ? AND host = ? AND port = ?
|]
(userId, host srv, port srv)
deleteClientServices :: DB.Connection -> UserId -> IO ()
deleteClientServices db userId = DB.execute db "DELETE FROM client_services WHERE user_id = ?" (Only userId)
createConn_ ::
TVar ChaChaDRG ->
ConnData ->
@@ -1926,6 +2005,15 @@ deriving newtype instance ToField ChunkReplicaId
deriving newtype instance FromField ChunkReplicaId
instance ToField X.CertificateChain where toField = toField . Binary . smpEncode . C.encodeCertChain
instance FromField X.CertificateChain where fromField = blobFieldDecoder (parseAll C.certChainP)
instance ToField X.PrivKey where toField = toField . Binary . C.encodeASNObj
instance FromField X.PrivKey where
fromField = blobFieldDecoder $ C.decodeASNKey >=> \case (pk, []) -> Right pk; r -> C.asnKeyError r
fromOnlyBI :: Only BoolInt -> Bool
fromOnlyBI (Only (BI b)) = b
{-# INLINE fromOnlyBI #-}
@@ -2005,19 +2093,18 @@ insertRcvQueue_ db connId' rq@RcvQueue {..} subMode serverKeyHash_ = do
db
[sql|
INSERT INTO rcv_queues
( host, port, rcv_id, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret,
( host, port, rcv_id, rcv_service_assoc, conn_id, rcv_private_key, rcv_dh_secret, e2e_priv_key, e2e_dh_secret,
snd_id, queue_mode, status, to_subscribe, rcv_queue_id, rcv_primary, replace_rcv_queue_id, smp_client_version, server_key_hash,
link_id, link_key, link_priv_sig_key, link_enc_fixed_data,
ntf_public_key, ntf_private_key, ntf_id, rcv_ntf_dh_secret
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?);
|]
( (host server, port server, rcvId, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
( (host server, port server, rcvId, rcvServiceAssoc, connId', rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret)
:. (sndId, queueMode, status, BI toSubscribe, qId, BI primary, dbReplaceQueueId, smpClientVersion, serverKeyHash_)
:. (shortLinkId <$> shortLink, shortLinkKey <$> shortLink, linkPrivSigKey <$> shortLink, linkEncFixedData <$> shortLink)
:. ntfCredsFields
)
-- TODO [certs rcv] save client service
pure (rq :: NewRcvQueue) {connId = connId', dbQueueId = qId, clientService = Nothing}
pure (rq :: NewRcvQueue) {connId = connId', dbQueueId = qId}
where
toSubscribe = subMode == SMOnlyCreate
ntfCredsFields = case clientNtfCreds of
@@ -2371,7 +2458,7 @@ rcvQueueQuery =
[sql|
SELECT c.user_id, COALESCE(q.server_key_hash, s.key_hash), q.conn_id, q.host, q.port, q.rcv_id, q.rcv_private_key, q.rcv_dh_secret,
q.e2e_priv_key, q.e2e_dh_secret, q.snd_id, q.queue_mode, q.status, c.enable_ntfs, q.client_notice_id,
q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.switch_status, q.smp_client_version, q.delete_errors,
q.rcv_queue_id, q.rcv_primary, q.replace_rcv_queue_id, q.switch_status, q.smp_client_version, q.delete_errors, q.rcv_service_assoc,
q.ntf_public_key, q.ntf_private_key, q.ntf_id, q.rcv_ntf_dh_secret,
q.link_id, q.link_key, q.link_priv_sig_key, q.link_enc_fixed_data
FROM rcv_queues q
@@ -2381,13 +2468,13 @@ rcvQueueQuery =
toRcvQueue ::
(UserId, C.KeyHash, ConnId, NonEmpty TransportHost, ServiceName, SMP.RecipientId, SMP.RcvPrivateAuthKey, SMP.RcvDhSecret, C.PrivateKeyX25519, Maybe C.DhSecretX25519, SMP.SenderId, Maybe QueueMode)
:. (QueueStatus, Maybe BoolInt, Maybe NoticeId, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int)
:. (QueueStatus, Maybe BoolInt, Maybe NoticeId, DBEntityId, BoolInt, Maybe Int64, Maybe RcvSwitchStatus, Maybe VersionSMPC, Int, ServiceAssoc)
:. (Maybe SMP.NtfPublicAuthKey, Maybe SMP.NtfPrivateAuthKey, Maybe SMP.NotifierId, Maybe RcvNtfDhSecret)
:. (Maybe SMP.LinkId, Maybe LinkKey, Maybe C.PrivateKeyEd25519, Maybe EncDataBytes) ->
RcvQueue
toRcvQueue
( (userId, keyHash, connId, host, port, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode)
:. (status, enableNtfs_, clientNoticeId, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors)
:. (status, enableNtfs_, clientNoticeId, dbQueueId, BI primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion_, deleteErrors, rcvServiceAssoc)
:. (ntfPublicKey_, ntfPrivateKey_, notifierId_, rcvNtfDhSecret_)
:. (shortLinkId_, shortLinkKey_, linkPrivSigKey_, linkEncFixedData_)
) =
@@ -2401,7 +2488,7 @@ toRcvQueue
_ -> Nothing
enableNtfs = maybe True unBI enableNtfs_
-- TODO [certs rcv] read client service
in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode, shortLink, clientService = Nothing, status, enableNtfs, clientNoticeId, dbQueueId, primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion, clientNtfCreds, deleteErrors}
in RcvQueue {userId, connId, server, rcvId, rcvPrivateKey, rcvDhSecret, e2ePrivKey, e2eDhSecret, sndId, queueMode, shortLink, rcvServiceAssoc, status, enableNtfs, clientNoticeId, dbQueueId, primary, dbReplaceQueueId, rcvSwchStatus, smpClientVersion, clientNtfCreds, deleteErrors}
-- | returns all connection queue credentials, the first queue is the primary one
getRcvQueueSubsByConnId_ :: DB.Connection -> ConnId -> IO (Maybe (NonEmpty RcvQueueSub))
@@ -46,6 +46,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250322_short_links
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitations_remove_cascade_delete
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251020_service_certs
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
schemaMigrations :: [(String, Query, Maybe Query)]
@@ -91,7 +92,8 @@ schemaMigrations =
("m20250322_short_links", m20250322_short_links, Just down_m20250322_short_links),
("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("m20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
("m20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices)
("m20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
("m20251020_service_certs", m20251020_service_certs, Just down_m20251020_service_certs)
]
-- | The list of migrations in ascending order by date
@@ -1,40 +0,0 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250517_service_certs where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
-- TODO move date forward, create migration for postgres
m20250517_service_certs :: Query
m20250517_service_certs =
[sql|
CREATE TABLE server_certs(
server_cert_id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users ON UPDATE RESTRICT ON DELETE CASCADE,
host TEXT NOT NULL,
port TEXT NOT NULL,
certificate BLOB NOT NULL,
priv_key BLOB NOT NULL,
service_id BLOB,
FOREIGN KEY(host, port) REFERENCES servers ON UPDATE CASCADE ON DELETE RESTRICT,
);
CREATE UNIQUE INDEX idx_server_certs_user_id_host_port ON server_certs(user_id, host, port);
CREATE INDEX idx_server_certs_host_port ON server_certs(host, port);
ALTER TABLE rcv_queues ADD COLUMN rcv_service_id BLOB;
|]
down_m20250517_service_certs :: Query
down_m20250517_service_certs =
[sql|
ALTER TABLE rcv_queues DROP COLUMN rcv_service_id;
DROP INDEX idx_server_certs_host_port;
DROP INDEX idx_server_certs_user_id_host_port;
DROP TABLE server_certs;
|]
@@ -0,0 +1,40 @@
{-# LANGUAGE QuasiQuotes #-}
module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251020_service_certs where
import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)
-- TODO move date forward, create migration for postgres
m20251020_service_certs :: Query
m20251020_service_certs =
[sql|
CREATE TABLE client_services(
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
host TEXT NOT NULL,
port TEXT NOT NULL,
service_cert BLOB NOT NULL,
service_cert_hash BLOB NOT NULL,
service_priv_key BLOB NOT NULL,
rcv_service_id BLOB,
FOREIGN KEY(host, port) REFERENCES servers ON UPDATE CASCADE ON DELETE RESTRICT
);
CREATE UNIQUE INDEX idx_server_certs_user_id_host_port ON client_services(user_id, host, port);
CREATE INDEX idx_server_certs_host_port ON client_services(host, port);
ALTER TABLE rcv_queues ADD COLUMN rcv_service_assoc INTEGER NOT NULL DEFAULT 0;
|]
down_m20251020_service_certs :: Query
down_m20251020_service_certs =
[sql|
ALTER TABLE rcv_queues DROP COLUMN rcv_service_assoc;
DROP INDEX idx_server_certs_host_port;
DROP INDEX idx_server_certs_user_id_host_port;
DROP TABLE client_services;
|]
@@ -63,6 +63,7 @@ CREATE TABLE rcv_queues(
to_subscribe INTEGER NOT NULL DEFAULT 0,
client_notice_id INTEGER
REFERENCES client_notices ON UPDATE RESTRICT ON DELETE SET NULL,
rcv_service_assoc INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY(host, port, rcv_id),
FOREIGN KEY(host, port) REFERENCES servers
ON DELETE RESTRICT ON UPDATE CASCADE,
@@ -450,6 +451,16 @@ CREATE TABLE client_notices(
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE TABLE client_services(
user_id INTEGER NOT NULL REFERENCES users ON DELETE CASCADE,
host TEXT NOT NULL,
port TEXT NOT NULL,
service_cert BLOB NOT NULL,
service_cert_hash BLOB NOT NULL,
service_priv_key BLOB NOT NULL,
rcv_service_id BLOB,
FOREIGN KEY(host, port) REFERENCES servers ON UPDATE CASCADE ON DELETE RESTRICT
);
CREATE UNIQUE INDEX idx_rcv_queues_ntf ON rcv_queues(host, port, ntf_id);
CREATE UNIQUE INDEX idx_rcv_queue_id ON rcv_queues(conn_id, rcv_queue_id);
CREATE UNIQUE INDEX idx_snd_queue_id ON snd_queues(conn_id, snd_queue_id);
@@ -593,3 +604,9 @@ CREATE UNIQUE INDEX idx_client_notices_entity ON client_notices(
entity_id
);
CREATE INDEX idx_rcv_queues_client_notice_id ON rcv_queues(client_notice_id);
CREATE UNIQUE INDEX idx_server_certs_user_id_host_port ON client_services(
user_id,
host,
port
);
CREATE INDEX idx_server_certs_host_port ON client_services(host, port);
+2 -2
View File
@@ -909,12 +909,12 @@ nsubResponse_ = \case
{-# INLINE nsubResponse_ #-}
-- This command is always sent in background request mode
subscribeService :: forall p. (PartyI p, ServiceParty p) => SMPClient -> SParty p -> ExceptT SMPClientError IO Int64
subscribeService :: forall p. (PartyI p, ServiceParty p) => SMPClient -> SParty p -> ExceptT SMPClientError IO (Int64, IdsHash)
subscribeService c party = case smpClientService c of
Just THClientService {serviceId, serviceKey} -> do
liftIO $ enablePings c
sendSMPCommand c NRMBackground (Just (C.APrivateAuthKey C.SEd25519 serviceKey)) serviceId subCmd >>= \case
SOKS n -> pure n
SOKS n idsHash -> pure (n, idsHash)
r -> throwE $ unexpectedResponse r
where
subCmd :: Command p
+3 -3
View File
@@ -479,14 +479,14 @@ smpSubscribeService ca smp srv serviceSub@(serviceId, _) = case smpClientService
(True <$ processSubscription r)
(pure False)
if ok
then case r of
Right n -> notify ca $ CAServiceSubscribed srv serviceSub n
then case r of -- TODO [certs rcv] compare hash
Right (n, _idsHash) -> notify ca $ CAServiceSubscribed srv serviceSub n
Left e
| smpClientServiceError e -> notifyUnavailable
| temporaryClientError e -> reconnectClient ca srv
| otherwise -> notify ca $ CAServiceSubError srv serviceSub e
else reconnectClient ca srv
processSubscription = mapM_ $ \n -> do
processSubscription = mapM_ $ \(n, _idsHash) -> do -- TODO [certs rcv] validate hash here?
setActiveServiceSub ca srv $ Just ((serviceId, n), sessId)
setPendingServiceSub ca srv Nothing
serviceAvailable THClientService {serviceRole, serviceId = serviceId'} =
+10 -8
View File
@@ -87,6 +87,8 @@ module Simplex.Messaging.Crypto
signatureKeyPair,
publicToX509,
encodeASNObj,
decodeASNKey,
asnKeyError,
-- * key encoding/decoding
encodePubKey,
@@ -1493,11 +1495,11 @@ encodeASNObj k = toStrict . encodeASN1 DER $ toASN1 k []
-- Decoding of binary X509 'CryptoPublicKey'.
decodePubKey :: CryptoPublicKey k => ByteString -> Either String k
decodePubKey = decodeKey >=> x509ToPublic >=> pubKey
decodePubKey = decodeASNKey >=> x509ToPublic >=> pubKey
-- Decoding of binary PKCS8 'PrivateKey'.
decodePrivKey :: CryptoPrivateKey k => ByteString -> Either String k
decodePrivKey = decodeKey >=> x509ToPrivate >=> privKey
decodePrivKey = decodeASNKey >=> x509ToPrivate >=> privKey
x509ToPublic :: (X.PubKey, [ASN1]) -> Either String APublicKey
x509ToPublic = \case
@@ -1505,7 +1507,7 @@ x509ToPublic = \case
(X.PubKeyEd448 k, []) -> Right . APublicKey SEd448 $ PublicKeyEd448 k
(X.PubKeyX25519 k, []) -> Right . APublicKey SX25519 $ PublicKeyX25519 k
(X.PubKeyX448 k, []) -> Right . APublicKey SX448 $ PublicKeyX448 k
r -> keyError r
r -> asnKeyError r
x509ToPublic' :: CryptoPublicKey k => X.PubKey -> Either String k
x509ToPublic' k = x509ToPublic (k, []) >>= pubKey
@@ -1517,16 +1519,16 @@ x509ToPrivate = \case
(X.PrivKeyEd448 k, []) -> Right $ APrivateKey SEd448 $ PrivateKeyEd448 k
(X.PrivKeyX25519 k, []) -> Right $ APrivateKey SX25519 $ PrivateKeyX25519 k
(X.PrivKeyX448 k, []) -> Right $ APrivateKey SX448 $ PrivateKeyX448 k
r -> keyError r
r -> asnKeyError r
x509ToPrivate' :: CryptoPrivateKey k => X.PrivKey -> Either String k
x509ToPrivate' pk = x509ToPrivate (pk, []) >>= privKey
{-# INLINE x509ToPrivate' #-}
decodeKey :: ASN1Object a => ByteString -> Either String (a, [ASN1])
decodeKey = fromASN1 <=< first show . decodeASN1 DER . fromStrict
decodeASNKey :: ASN1Object a => ByteString -> Either String (a, [ASN1])
decodeASNKey = fromASN1 <=< first show . decodeASN1 DER . fromStrict
keyError :: (a, [ASN1]) -> Either String b
keyError = \case
asnKeyError :: (a, [ASN1]) -> Either String b
asnKeyError = \case
(_, []) -> Left "unknown key algorithm"
_ -> Left "more than one key"
+18 -3
View File
@@ -140,6 +140,7 @@ module Simplex.Messaging.Protocol
RcvMessage (..),
MsgId,
MsgBody,
IdsHash,
MaxMessageLen,
MaxRcvMessageLen,
EncRcvMsgBody (..),
@@ -698,11 +699,13 @@ data BrokerMsg where
-- | Service subscription success - confirms when queue was associated with the service
SOK :: Maybe ServiceId -> BrokerMsg
-- | The number of queues subscribed with SUBS command
SOKS :: Int64 -> BrokerMsg
SOKS :: Int64 -> IdsHash -> BrokerMsg
-- MSG v1/2 has to be supported for encoding/decoding
-- v1: MSG :: MsgId -> SystemTime -> MsgBody -> BrokerMsg
-- v2: MsgId -> SystemTime -> MsgFlags -> MsgBody -> BrokerMsg
MSG :: RcvMessage -> BrokerMsg
-- sent once delivering messages to SUBS command is complete
SALL :: BrokerMsg
NID :: NotifierId -> RcvNtfPublicDhKey -> BrokerMsg
NMSG :: C.CbNonce -> EncNMsgMeta -> BrokerMsg
-- Should include certificate chain
@@ -939,6 +942,7 @@ data BrokerMsgTag
| SOK_
| SOKS_
| MSG_
| SALL_
| NID_
| NMSG_
| PKEY_
@@ -1031,6 +1035,7 @@ instance Encoding BrokerMsgTag where
SOK_ -> "SOK"
SOKS_ -> "SOKS"
MSG_ -> "MSG"
SALL_ -> "SALL"
NID_ -> "NID"
NMSG_ -> "NMSG"
PKEY_ -> "PKEY"
@@ -1052,6 +1057,7 @@ instance ProtocolMsgTag BrokerMsgTag where
"SOK" -> Just SOK_
"SOKS" -> Just SOKS_
"MSG" -> Just MSG_
"SALL" -> Just SALL_
"NID" -> Just NID_
"NMSG" -> Just NMSG_
"PKEY" -> Just PKEY_
@@ -1454,6 +1460,8 @@ type MsgId = ByteString
-- | SMP message body.
type MsgBody = ByteString
type IdsHash = ByteString
data ProtocolErrorType = PECmdSyntax | PECmdUnknown | PESession | PEBlock
-- | Type for protocol errors.
@@ -1834,9 +1842,12 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
SOK serviceId_
| v >= serviceCertsSMPVersion -> e (SOK_, ' ', serviceId_)
| otherwise -> e OK_ -- won't happen, the association with the service requires v >= serviceCertsSMPVersion
SOKS n -> e (SOKS_, ' ', n)
SOKS n idsHash
| v >= rcvServiceSMPVersion -> e (SOKS_, ' ', n, idsHash)
| otherwise -> e (SOKS_, ' ', n)
MSG RcvMessage {msgId, msgBody = EncRcvMsgBody body} ->
e (MSG_, ' ', msgId, Tail body)
SALL -> e SALL_
NID nId srvNtfDh -> e (NID_, ' ', nId, srvNtfDh)
NMSG nmsgNonce encNMsgMeta -> e (NMSG_, ' ', nmsgNonce, encNMsgMeta)
PKEY sid vr certKey -> e (PKEY_, ' ', sid, vr, certKey)
@@ -1867,6 +1878,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
MSG . RcvMessage msgId <$> bodyP
where
bodyP = EncRcvMsgBody . unTail <$> smpP
SALL_ -> pure SALL
IDS_
| v >= newNtfCredsSMPVersion -> ids smpP smpP smpP smpP
| v >= serviceCertsSMPVersion -> ids smpP smpP smpP nothing
@@ -1887,7 +1899,9 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
pure $ IDS QIK {rcvId, sndId, rcvPublicDhKey, queueMode, linkId, serviceId, serverNtfCreds}
LNK_ -> LNK <$> _smpP <*> smpP
SOK_ -> SOK <$> _smpP
SOKS_ -> SOKS <$> _smpP
SOKS_
| v >= rcvServiceSMPVersion -> SOKS <$> _smpP <*> smpP
| otherwise -> SOKS <$> _smpP <*> pure B.empty
NID_ -> NID <$> _smpP <*> smpP
NMSG_ -> NMSG <$> _smpP <*> smpP
PKEY_ -> PKEY <$> _smpP <*> smpP <*> smpP
@@ -1917,6 +1931,7 @@ instance ProtocolEncoding SMPVersion ErrorType BrokerMsg where
PONG -> noEntityMsg
PKEY {} -> noEntityMsg
RRES _ -> noEntityMsg
SALL -> noEntityMsg
-- other broker responses must have queue ID
_
| B.null entId -> Left $ CMD NO_ENTITY
+146 -72
View File
@@ -1359,7 +1359,7 @@ client
-- TODO [certs rcv] rcv subscriptions
Server {subscribers, ntfSubscribers}
ms
clnt@Client {clientId, ntfSubscriptions, ntfServiceSubscribed, serviceSubsCount = _todo', ntfServiceSubsCount, rcvQ, sndQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do
clnt@Client {clientId, rcvQ, sndQ, msgQ, clientTHParams = thParams'@THandleParams {sessionId}, procThreads} = do
labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands"
let THandleParams {thVersion} = thParams'
clntServiceId = (\THClientService {serviceId} -> serviceId) <$> (peerClientService =<< thAuth thParams')
@@ -1495,7 +1495,9 @@ client
OFF -> response <$> maybe (pure $ err INTERNAL) suspendQueue_ q_
DEL -> response <$> maybe (pure $ err INTERNAL) delQueueAndMsgs q_
QUE -> withQueue $ \q qr -> (corrId,entId,) <$> getQueueInfo q qr
Cmd SRecipientService SUBS -> pure $ response $ err (CMD PROHIBITED) -- "TODO [certs rcv]"
Cmd SRecipientService SUBS -> response . (corrId,entId,) <$> case clntServiceId of
Just serviceId -> subscribeServiceMessages serviceId
Nothing -> pure $ ERR INTERNAL -- it's "internal" because it should never get to this branch
where
createQueue :: NewQueueReq -> M s (Transmission BrokerMsg)
createQueue NewQueueReq {rcvAuthKey, rcvDhKey, subMode, queueReqData, ntfCreds}
@@ -1615,11 +1617,13 @@ client
suspendQueue_ :: (StoreQueue s, QueueRec) -> M s (Transmission BrokerMsg)
suspendQueue_ (q, _) = liftIO $ either err (const ok) <$> suspendQueue (queueStore ms) q
-- TODO [certs rcv] if serviceId is passed, associate with the service and respond with SOK
subscribeQueueAndDeliver :: StoreQueue s -> QueueRec -> M s ResponseAndMessage
subscribeQueueAndDeliver q qr =
subscribeQueueAndDeliver q qr@QueueRec {rcvServiceId} =
liftIO (TM.lookupIO entId $ subscriptions clnt) >>= \case
Nothing -> subscribeRcvQueue qr >>= deliver False
Nothing ->
sharedSubscribeQueue q SRecipientService rcvServiceId subscribers subscriptions serviceSubsCount (newSubscription NoSub) rcvServices >>= \case
Left e -> pure (err e, Nothing)
Right s -> deliver s
Just s@Sub {subThread} -> do
stats <- asks serverStats
case subThread of
@@ -1629,27 +1633,29 @@ client
pure (err (CMD PROHIBITED), Nothing)
_ -> do
incStat $ qSubDuplicate stats
atomically (writeTVar (delivered s) Nothing) >> deliver True s
atomically (writeTVar (delivered s) Nothing) >> deliver (True, Just s)
where
deliver :: Bool -> Sub -> M s ResponseAndMessage
deliver hasSub sub = do
deliver :: (Bool, Maybe Sub) -> M s ResponseAndMessage
deliver (hasSub, sub_) = do
stats <- asks serverStats
fmap (either ((,Nothing) . err) id) $ liftIO $ runExceptT $ do
msg_ <- tryPeekMsg ms q
msg' <- forM msg_ $ \msg -> liftIO $ do
ts <- getSystemSeconds
sub <- maybe (atomically getSub) pure sub_
atomically $ setDelivered sub msg ts
unless hasSub $ incStat $ qSub stats
pure (NoCorrId, entId, MSG (encryptMsg qr msg))
pure ((corrId, entId, SOK clntServiceId), msg')
-- TODO [certs rcv] combine with subscribing ntf queues
subscribeRcvQueue :: QueueRec -> M s Sub
subscribeRcvQueue QueueRec {rcvServiceId} = atomically $ do
writeTQueue (subQ subscribers) (CSClient entId rcvServiceId Nothing, clientId)
sub <- newSubscription NoSub
TM.insert entId sub $ subscriptions clnt
pure sub
getSub :: STM Sub
getSub =
TM.lookup entId (subscriptions clnt) >>= \case
Just sub -> pure sub
Nothing -> do
sub <- newSubscription NoSub
TM.insert entId sub $ subscriptions clnt
pure sub
subscribeNewQueue :: RecipientId -> QueueRec -> M s ()
subscribeNewQueue rId QueueRec {rcvServiceId} = do
@@ -1719,74 +1725,131 @@ client
else liftIO (updateQueueTime (queueStore ms) q t) >>= either (pure . err') (action q)
subscribeNotifications :: StoreQueue s -> NtfCreds -> M s BrokerMsg
subscribeNotifications q NtfCreds {ntfServiceId} = do
subscribeNotifications q NtfCreds {ntfServiceId} =
sharedSubscribeQueue q SNotifierService ntfServiceId ntfSubscribers ntfSubscriptions ntfServiceSubsCount (pure ()) ntfServices >>= \case
Left e -> pure $ ERR e
Right (hasSub, _) -> do
when (isNothing clntServiceId) $
asks serverStats >>= incStat . (if hasSub then ntfSubDuplicate else ntfSub)
pure $ SOK clntServiceId
sharedSubscribeQueue ::
(PartyI p, ServiceParty p) =>
StoreQueue s ->
SParty p ->
Maybe ServiceId ->
ServerSubscribers s ->
(Client s -> TMap QueueId sub) ->
(Client s -> TVar Int64) ->
STM sub ->
(ServerStats -> ServiceStats) ->
M s (Either ErrorType (Bool, Maybe sub))
sharedSubscribeQueue q party queueServiceId srvSubscribers clientSubs clientServiceSubs mkSub servicesSel = do
stats <- asks serverStats
let incNtfSrvStat sel = incStat $ sel $ ntfServices stats
case clntServiceId of
let incSrvStat sel = incStat $ sel $ servicesSel stats
writeSub = writeTQueue (subQ srvSubscribers) (CSClient entId queueServiceId clntServiceId, clientId)
liftIO $ case clntServiceId of
Just serviceId
| ntfServiceId == Just serviceId -> do
| queueServiceId == Just serviceId -> do
-- duplicate queue-service association - can only happen in case of response error/timeout
hasSub <- atomically $ ifM hasServiceSub (pure True) (False <$ newServiceQueueSub)
hasSub <- atomically $ ifM hasServiceSub (pure True) (False <$ incServiceQueueSubs)
unless hasSub $ do
incNtfSrvStat srvSubCount
incNtfSrvStat srvSubQueues
incNtfSrvStat srvAssocDuplicate
pure $ SOK $ Just serviceId
| otherwise ->
atomically writeSub
incSrvStat srvSubCount
incSrvStat srvSubQueues
incSrvStat srvAssocDuplicate
pure $ Right (hasSub, Nothing)
| otherwise -> runExceptT $ do
-- new or updated queue-service association
liftIO (setQueueService (queueStore ms) q SNotifierService (Just serviceId)) >>= \case
Left e -> pure $ ERR e
Right () -> do
hasSub <- atomically $ (<$ newServiceQueueSub) =<< hasServiceSub
unless hasSub $ incNtfSrvStat srvSubCount
incNtfSrvStat srvSubQueues
incNtfSrvStat $ maybe srvAssocNew (const srvAssocUpdated) ntfServiceId
pure $ SOK $ Just serviceId
ExceptT $ setQueueService (queueStore ms) q party (Just serviceId)
hasSub <- atomically $ (<$ incServiceQueueSubs) =<< hasServiceSub
atomically writeSub
liftIO $ do
unless hasSub $ incSrvStat srvSubCount
incSrvStat srvSubQueues
incSrvStat $ maybe srvAssocNew (const srvAssocUpdated) queueServiceId
pure (hasSub, Nothing)
where
hasServiceSub = (0 /=) <$> readTVar ntfServiceSubsCount
-- This function is used when queue is associated with the service.
newServiceQueueSub = do
writeTQueue (subQ ntfSubscribers) (CSClient entId ntfServiceId (Just serviceId), clientId)
modifyTVar' ntfServiceSubsCount (+ 1) -- service count
modifyTVar' (totalServiceSubs ntfSubscribers) (+ 1) -- server count for all services
Nothing -> case ntfServiceId of
Just _ ->
liftIO (setQueueService (queueStore ms) q SNotifierService Nothing) >>= \case
Left e -> pure $ ERR e
Right () -> do
-- hasSubscription should never be True in this branch, because queue was associated with service.
-- So unless storage and session states diverge, this check is redundant.
hasSub <- atomically $ hasSubscription >>= newSub
incNtfSrvStat srvAssocRemoved
sok hasSub
hasServiceSub = (0 /=) <$> readTVar (clientServiceSubs clnt)
-- This function is used when queue association with the service is created.
incServiceQueueSubs = modifyTVar' (clientServiceSubs clnt) (+ 1) -- service count
Nothing -> case queueServiceId of
Just _ -> runExceptT $ do
ExceptT $ setQueueService (queueStore ms) q party Nothing
liftIO $ incSrvStat srvAssocRemoved
-- getSubscription may be Just for receiving service, where clientSubs also hold active deliveries for service subscriptions.
-- For notification service it can only be Just if storage and session states diverge.
r <- atomically $ getSubscription >>= newSub
atomically writeSub
pure r
Nothing -> do
hasSub <- atomically $ ifM hasSubscription (pure True) (newSub False)
sok hasSub
r@(hasSub, _) <- atomically $ getSubscription >>= newSub
unless hasSub $ atomically writeSub
pure $ Right r
where
hasSubscription = TM.member entId ntfSubscriptions
newSub hasSub = do
writeTQueue (subQ ntfSubscribers) (CSClient entId ntfServiceId Nothing, clientId)
unless (hasSub) $ TM.insert entId () ntfSubscriptions
pure hasSub
sok hasSub = do
incStat $ if hasSub then ntfSubDuplicate stats else ntfSub stats
pure $ SOK Nothing
getSubscription = TM.lookup entId $ clientSubs clnt
newSub = \case
Just sub -> pure (True, Just sub)
Nothing -> do
sub <- mkSub
TM.insert entId sub $ clientSubs clnt
pure (False, Just sub)
subscribeServiceMessages :: ServiceId -> M s BrokerMsg
subscribeServiceMessages serviceId =
sharedSubscribeService SRecipientService serviceId subscribers serviceSubscribed serviceSubsCount >>= \case
Left e -> pure $ ERR e
Right (hasSub, (count, idsHash)) -> do
unless hasSub $ forkClient clnt "deliverServiceMessages" $ liftIO $ deliverServiceMessages count
pure $ SOKS count idsHash
where
deliverServiceMessages expectedCnt = do
(qCnt, _msgCnt, _dupCnt, _errCnt) <- foldRcvServiceMessages ms serviceId deliverQueueMsg (0, 0, 0, 0)
atomically $ writeTBQueue msgQ [(NoCorrId, NoEntity, SALL)]
-- TODO [cert rcv] compare with expected
logNote $ "Service subscriptions for " <> tshow serviceId <> " (" <> tshow qCnt <> " queues)"
deliverQueueMsg :: (Int, Int, Int, Int) -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO (Int, Int, Int, Int)
deliverQueueMsg (!qCnt, !msgCnt, !dupCnt, !errCnt) rId = \case
Left e -> pure (qCnt + 1, msgCnt, dupCnt, errCnt + 1) -- TODO [certs rcv] deliver subscription error
Right qMsg_ -> case qMsg_ of
Nothing -> pure (qCnt + 1, msgCnt, dupCnt, errCnt)
Just (qr, msg) ->
atomically (getSubscription rId) >>= \case
Nothing -> pure (qCnt + 1, msgCnt, dupCnt + 1, errCnt)
Just sub -> do
ts <- getSystemSeconds
atomically $ setDelivered sub msg ts
atomically $ writeTBQueue msgQ [(NoCorrId, rId, MSG (encryptMsg qr msg))]
pure (qCnt + 1, msgCnt + 1, dupCnt, errCnt)
getSubscription rId =
TM.lookup rId (subscriptions clnt) >>= \case
-- If delivery subscription already exists, then there is no need to deliver message.
-- It may have been created when the message is sent after service subscription is created.
Just _sub -> pure Nothing
Nothing -> do
sub <- newSubscription NoSub
TM.insert rId sub $ subscriptions clnt
pure $ Just sub
subscribeServiceNotifications :: ServiceId -> M s BrokerMsg
subscribeServiceNotifications serviceId = do
subscribed <- readTVarIO ntfServiceSubscribed
if subscribed
then SOKS <$> readTVarIO ntfServiceSubsCount
else
liftIO (getServiceQueueCount @(StoreQueue s) (queueStore ms) SNotifierService serviceId) >>= \case
Left e -> pure $ ERR e
Right !count' -> do
subscribeServiceNotifications serviceId =
either ERR (uncurry SOKS . snd) <$> sharedSubscribeService SNotifierService serviceId ntfSubscribers ntfServiceSubscribed ntfServiceSubsCount
sharedSubscribeService :: (PartyI p, ServiceParty p) => SParty p -> ServiceId -> ServerSubscribers s -> (Client s -> TVar Bool) -> (Client s -> TVar Int64) -> M s (Either ErrorType (Bool, (Int64, IdsHash)))
sharedSubscribeService party serviceId srvSubscribers clientServiceSubscribed clientServiceSubs = do
subscribed <- readTVarIO $ clientServiceSubscribed clnt
liftIO $ runExceptT $
(subscribed,)
<$> if subscribed
then (,B.empty) <$> readTVarIO (clientServiceSubs clnt) -- TODO [certs rcv] get IDs hash
else do
count' <- ExceptT $ getServiceQueueCount @(StoreQueue s) (queueStore ms) party serviceId
incCount <- atomically $ do
writeTVar ntfServiceSubscribed True
count <- swapTVar ntfServiceSubsCount count'
writeTVar (clientServiceSubscribed clnt) True
count <- swapTVar (clientServiceSubs clnt) count'
pure $ count' - count
atomically $ writeTQueue (subQ ntfSubscribers) (CSService serviceId incCount, clientId)
pure $ SOKS count'
atomically $ writeTQueue (subQ srvSubscribers) (CSService serviceId incCount, clientId)
pure (count', B.empty) -- TODO [certs rcv] get IDs hash
acknowledgeMsg :: MsgId -> StoreQueue s -> QueueRec -> M s (Transmission BrokerMsg)
acknowledgeMsg msgId q qr =
@@ -1904,10 +1967,13 @@ client
tryDeliverMessage msg =
-- the subscribed client var is read outside of STM to avoid transaction cost
-- in case no client is subscribed.
getSubscribedClient rId (queueSubscribers subscribers)
getSubscribed
$>>= deliverToSub
>>= mapM_ forkDeliver
where
getSubscribed = case rcvServiceId qr of
Just serviceId -> getSubscribedClient serviceId $ serviceSubscribers subscribers
Nothing -> getSubscribedClient rId $ queueSubscribers subscribers
rId = recipientId q
deliverToSub rcv = do
ts <- getSystemSeconds
@@ -1918,6 +1984,7 @@ client
-- the new client will receive message in response to SUB.
readTVar rcv
$>>= \rc@Client {subscriptions = subs, sndQ = sndQ'} -> TM.lookup rId subs
>>= maybe (newServiceDeliverySub subs) (pure . Just)
$>>= \s@Sub {subThread, delivered} -> case subThread of
ProhibitSub -> pure Nothing
ServerSub st -> readTVar st >>= \case
@@ -1930,6 +1997,12 @@ client
(writeTVar st SubPending $> Just (rc, s, st))
(deliver sndQ' s ts $> Nothing)
_ -> pure Nothing
newServiceDeliverySub subs
| isJust (rcvServiceId qr) = do
sub <- newSubscription NoSub
TM.insert rId sub subs
pure $ Just sub
| otherwise = pure Nothing
deliver sndQ' s ts = do
let encMsg = encryptMsg qr msg
writeTBQueue sndQ' ([(NoCorrId, rId, MSG encMsg)], [])
@@ -2051,6 +2124,7 @@ client
-- we delete subscription here, so the client with no subscriptions can be disconnected.
sub <- atomically $ TM.lookupDelete entId $ subscriptions clnt
liftIO $ mapM_ cancelSub sub
when (isJust rcvServiceId) $ atomically $ modifyTVar' (serviceSubsCount clnt) $ \n -> max 0 (n - 1)
atomically $ writeTQueue (subQ subscribers) (CSDeleted entId rcvServiceId, clientId)
forM_ (notifier qr) $ \NtfCreds {notifierId = nId, ntfServiceId} -> do
-- queue is deleted by a different client from the one subscribed to notifications,
@@ -444,6 +444,26 @@ instance MsgStoreClass (JournalMsgStore s) where
getLoadedQueue :: JournalQueue s -> IO (JournalQueue s)
getLoadedQueue q = fromMaybe q <$> TM.lookupIO (recipientId q) (loadedQueues $ queueStore_ ms)
foldRcvServiceMessages :: JournalMsgStore s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
foldRcvServiceMessages ms serviceId f acc = case queueStore_ ms of
MQStore st -> foldRcvServiceQueues st serviceId f' acc
where
f' a (q, qr) = runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)
#if defined(dbServerPostgres)
PQStore st -> foldRcvServiceQueueRecs st serviceId f' acc
where
JournalMsgStore {queueLocks, sharedLock} = ms
f' a (rId, qr) = do
q <- mkQueue ms False rId qr
qMsg_ <-
withSharedWaitLock rId queueLocks sharedLock $ runExceptT $ tryStore' "foldRcvServiceMessages" rId $
(qr,) . snd <$$> (getLoadedQueue q >>= unStoreIO . getPeekMsgQueue ms)
f a rId qMsg_
-- Use cached queue if available.
-- Also see the comment in loadQueue in PostgresQueueStore
getLoadedQueue q = fromMaybe q <$> TM.lookupIO (recipientId q) (loadedQueues $ queueStore_ ms)
#endif
logQueueStates :: JournalMsgStore s -> IO ()
logQueueStates ms = withActiveMsgQueues ms $ unStoreIO . logQueueState
@@ -119,6 +119,34 @@ instance MsgStoreClass PostgresMsgStore where
toMessageStats (expiredMsgsCount, storedMsgsCount, storedQueues) =
MessageStats {expiredMsgsCount, storedMsgsCount, storedQueues}
foldRcvServiceMessages :: PostgresMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
foldRcvServiceMessages ms serviceId f acc =
withTransaction (dbStore $ queueStore_ ms) $ \db ->
DB.fold
db
[sql|
SELECT q.recipient_id, q.recipient_keys, q.rcv_dh_secret,
q.sender_id, q.sender_key, q.queue_mode,
q.notifier_id, q.notifier_key, q.rcv_ntf_dh_secret, q.ntf_service_id,
q.status, q.updated_at, q.link_id, q.rcv_service_id,
m.msg_id, m.msg_ts, m.msg_quota, m.msg_ntf_flag, m.msg_body
FROM msg_queues q
LEFT JOIN (
SELECT recipient_id, msg_id, msg_ts, msg_quota, msg_ntf_flag, msg_body,
ROW_NUMBER() OVER (PARTITION BY recipient_id ORDER BY message_id ASC) AS row_num
FROM messages
) m ON q.recipient_id = m.recipient_id AND m.row_num = 1
WHERE q.rcv_service_id = ? AND q.deleted_at IS NULL;
|]
(Only serviceId)
acc
f'
where
f' a (qRow :. mRow) =
let (rId, qr) = rowToQueueRec qRow
msg_ = toMaybeMessage mRow
in f a rId $ Right ((qr,) <$> msg_)
logQueueStates _ = error "logQueueStates not used"
logQueueState _ = error "logQueueState not used"
@@ -247,6 +275,11 @@ uninterruptibleMask_ :: ExceptT ErrorType IO a -> ExceptT ErrorType IO a
uninterruptibleMask_ = ExceptT . E.uninterruptibleMask_ . runExceptT
{-# INLINE uninterruptibleMask_ #-}
toMaybeMessage :: (Maybe (Binary MsgId), Maybe Int64, Maybe Bool, Maybe Bool, Maybe (Binary MsgBody)) -> Maybe Message
toMaybeMessage = \case
(Just msgId, Just ts, Just msgQuota, Just ntf, Just body) -> Just $ toMessage (msgId, ts, msgQuota, ntf, body)
_ -> Nothing
toMessage :: (Binary MsgId, Int64, Bool, Bool, Binary MsgBody) -> Message
toMessage (Binary msgId, ts, msgQuota, ntf, Binary body)
| msgQuota = MessageQuota {msgId, msgTs}
@@ -87,6 +87,11 @@ instance MsgStoreClass STMMsgStore where
expireOldMessages _tty ms now ttl =
withLoadedQueues (queueStore_ ms) $ atomically . expireQueueMsgs ms now (now - ttl)
foldRcvServiceMessages :: STMMsgStore -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
foldRcvServiceMessages ms serviceId f=
foldRcvServiceQueues (queueStore_ ms) serviceId $ \a (q, qr) ->
runExceptT (tryPeekMsg ms q) >>= f a (recipientId q) . ((qr,) <$$>)
logQueueStates _ = pure ()
{-# INLINE logQueueStates #-}
logQueueState _ = pure ()
@@ -45,6 +45,7 @@ class (Monad (StoreMonad s), QueueStoreClass (StoreQueue s) (QueueStore s)) => M
unsafeWithAllMsgQueues :: Monoid a => Bool -> s -> (StoreQueue s -> IO a) -> IO a
-- tty, store, now, ttl
expireOldMessages :: Bool -> s -> Int64 -> Int64 -> IO MessageStats
foldRcvServiceMessages :: s -> ServiceId -> (a -> RecipientId -> Either ErrorType (Maybe (QueueRec, Message)) -> IO a) -> a -> IO a
logQueueStates :: s -> IO ()
logQueueState :: StoreQueue s -> StoreMonad s ()
queueStore :: s -> QueueStore s
@@ -24,9 +24,11 @@ module Simplex.Messaging.Server.QueueStore.Postgres
batchInsertServices,
batchInsertQueues,
foldServiceRecs,
foldRcvServiceQueueRecs,
foldQueueRecs,
foldRecentQueueRecs,
handleDuplicate,
rowToQueueRec,
withLog_,
withDB,
withDB',
@@ -577,12 +579,17 @@ insertServiceQuery =
VALUES (?,?,?,?,?)
|]
foldServiceRecs :: forall a q. Monoid a => PostgresQueueStore q -> (ServiceRec -> IO a) -> IO a
foldServiceRecs :: Monoid a => PostgresQueueStore q -> (ServiceRec -> IO a) -> IO a
foldServiceRecs st f =
withTransaction (dbStore st) $ \db ->
DB.fold_ db "SELECT service_id, service_role, service_cert, service_cert_hash, created_at FROM services" mempty $
\ !acc -> fmap (acc <>) . f . rowToServiceRec
foldRcvServiceQueueRecs :: PostgresQueueStore q -> ServiceId -> (a -> (RecipientId, QueueRec) -> IO a) -> a -> IO a
foldRcvServiceQueueRecs st serviceId f acc =
withTransaction (dbStore st) $ \db ->
DB.fold db (queueRecQuery <> " WHERE rcv_service_id = ? AND deleted_at IS NULL") (Only serviceId) acc $ \a -> f a . rowToQueueRec
foldQueueRecs :: Monoid a => Bool -> Bool -> PostgresQueueStore q -> ((RecipientId, QueueRec) -> IO a) -> IO a
foldQueueRecs withData = foldQueueRecs_ foldRecs
where
@@ -769,10 +776,6 @@ instance ToField SMPServiceRole where toField = toField . decodeLatin1 . smpEnco
instance FromField SMPServiceRole where fromField = fromTextField_ $ eitherToMaybe . smpDecode . encodeUtf8
instance ToField X.CertificateChain where toField = toField . Binary . smpEncode . C.encodeCertChain
instance FromField X.CertificateChain where fromField = blobFieldDecoder (parseAll C.certChainP)
#if !defined(dbPostgres)
instance ToField EntityId where toField (EntityId s) = toField $ Binary s
@@ -797,4 +800,8 @@ deriving newtype instance FromField EncDataBytes
deriving newtype instance ToField (RoundedSystemTime t)
deriving newtype instance FromField (RoundedSystemTime t)
instance ToField X.CertificateChain where toField = toField . Binary . smpEncode . C.encodeCertChain
instance FromField X.CertificateChain where fromField = blobFieldDecoder (parseAll C.certChainP)
#endif
+12 -1
View File
@@ -17,6 +17,7 @@
module Simplex.Messaging.Server.QueueStore.STM
( STMQueueStore (..),
STMService (..),
foldRcvServiceQueues,
setStoreLog,
withLog',
readQueueRecIO,
@@ -45,7 +46,7 @@ import Simplex.Messaging.SystemTime
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPServiceRole (..))
import Simplex.Messaging.Util (anyM, ifM, tshow, ($>>), ($>>=), (<$$))
import Simplex.Messaging.Util (anyM, ifM, tshow, ($>>), ($>>=), (<$$), (<$$>))
import System.IO
import UnliftIO.STM
@@ -359,6 +360,16 @@ instance StoreQueueClass q => QueueStoreClass q (STMQueueStore q) where
SRecipientService -> serviceRcvQueues
SNotifierService -> serviceNtfQueues
foldRcvServiceQueues :: StoreQueueClass q => STMQueueStore q -> ServiceId -> (a -> (q, QueueRec) -> IO a) -> a -> IO a
foldRcvServiceQueues st serviceId f acc =
TM.lookupIO serviceId (services st) >>= \case
Nothing -> pure acc
Just s ->
readTVarIO (serviceRcvQueues s)
>>= foldM (\a -> get >=> maybe (pure a) (f a)) acc
where
get rId = TM.lookupIO rId (queues st) $>>= \q -> (q,) <$$> readTVarIO (queueRec q)
withQueueRec :: TVar (Maybe QueueRec) -> (QueueRec -> STM a) -> IO (Either ErrorType a)
withQueueRec qr a = atomically $ readQueueRec qr >>= mapM a
+15 -8
View File
@@ -56,6 +56,7 @@ module Simplex.Messaging.Transport
serviceCertsSMPVersion,
newNtfCredsSMPVersion,
clientNoticesSMPVersion,
rcvServiceSMPVersion,
simplexMQVersion,
smpBlockSize,
TransportConfig (..),
@@ -170,6 +171,7 @@ smpBlockSize = 16384
-- 16 - service certificates (5/31/2025)
-- 17 - create notification credentials with NEW (7/12/2025)
-- 18 - support client notices (10/10/2025)
-- 19 - service subscriptions to messages (10/20/2025)
data SMPVersion
@@ -218,6 +220,9 @@ newNtfCredsSMPVersion = VersionSMP 17
clientNoticesSMPVersion :: VersionSMP
clientNoticesSMPVersion = VersionSMP 18
rcvServiceSMPVersion :: VersionSMP
rcvServiceSMPVersion = VersionSMP 19
minClientSMPRelayVersion :: VersionSMP
minClientSMPRelayVersion = VersionSMP 6
@@ -225,13 +230,13 @@ minServerSMPRelayVersion :: VersionSMP
minServerSMPRelayVersion = VersionSMP 6
currentClientSMPRelayVersion :: VersionSMP
currentClientSMPRelayVersion = VersionSMP 18
currentClientSMPRelayVersion = VersionSMP 19
legacyServerSMPRelayVersion :: VersionSMP
legacyServerSMPRelayVersion = VersionSMP 6
currentServerSMPRelayVersion :: VersionSMP
currentServerSMPRelayVersion = VersionSMP 18
currentServerSMPRelayVersion = VersionSMP 19
-- Max SMP protocol version to be used in e2e encrypted
-- connection between client and server, as defined by SMP proxy.
@@ -239,7 +244,7 @@ currentServerSMPRelayVersion = VersionSMP 18
-- to prevent client version fingerprinting by the
-- destination relays when clients upgrade at different times.
proxiedSMPRelayVersion :: VersionSMP
proxiedSMPRelayVersion = VersionSMP 17
proxiedSMPRelayVersion = VersionSMP 18
-- minimal supported protocol version is 6
-- TODO remove code that supports sending commands without batching
@@ -823,7 +828,7 @@ smpClientHandshake c ks_ keyHash@(C.KeyHash kh) vRange proxyServer serviceKeys_
serviceKeys = case serviceKeys_ of
Just sks | v >= serviceCertsSMPVersion && certificateSent c -> Just sks
_ -> Nothing
clientService = mkClientService <$> serviceKeys
clientService = mkClientService v =<< serviceKeys
hs = SMPClientHandshake {smpVersion = v, keyHash, authPubKey = fst <$> ks_, proxyServer, clientService}
sendHandshake th hs
service <- mapM getClientService serviceKeys
@@ -831,10 +836,12 @@ smpClientHandshake c ks_ keyHash@(C.KeyHash kh) vRange proxyServer serviceKeys_
Nothing -> throwE TEVersion
where
th@THandle {params = THandleParams {sessionId}} = smpTHandle c
mkClientService :: (ServiceCredentials, C.KeyPairEd25519) -> SMPClientHandshakeService
mkClientService (ServiceCredentials {serviceRole, serviceCreds, serviceSignKey}, (k, _)) =
let sk = C.signX509 serviceSignKey $ C.publicToX509 k
in SMPClientHandshakeService {serviceRole, serviceCertKey = CertChainPubKey (fst serviceCreds) sk}
mkClientService :: VersionSMP -> (ServiceCredentials, C.KeyPairEd25519) -> Maybe SMPClientHandshakeService
mkClientService v (ServiceCredentials {serviceRole, serviceCreds, serviceSignKey}, (k, _))
| serviceRole == SRMessaging && v < rcvServiceSMPVersion = Nothing
| otherwise =
let sk = C.signX509 serviceSignKey $ C.publicToX509 k
in Just SMPClientHandshakeService {serviceRole, serviceCertKey = CertChainPubKey (fst serviceCreds) sk}
getClientService :: (ServiceCredentials, C.KeyPairEd25519) -> ExceptT TransportError IO THClientService
getClientService (ServiceCredentials {serviceRole, serviceCertHash}, (_, pk)) =
getHandshake th >>= \case