core, ui: relay management - add, remove relays, synchronization to relay list (#6917)

This commit is contained in:
spaced4ndy
2026-05-08 07:19:16 +00:00
committed by GitHub
parent d9cfc9bd3d
commit 6f8a07e4ea
44 changed files with 1861 additions and 182 deletions
+111 -25
View File
@@ -1778,11 +1778,14 @@ processChatCommand vr nm = \case
gInfo@GroupInfo {groupProfile = p, groupSummary = GroupSummary {publicMemberCount = localCount}} <- withFastStore $ \db -> getGroupInfo db vr user groupId
case p of
GroupProfile {publicGroup = Just PublicGroupProfile {groupLink = sLnk}} | useRelays' gInfo -> do
(_, cData) <- getShortLinkConnReq nm user sLnk
(_, cData@(ContactLinkData _ UserContactData {relays = currentRelayLinks})) <- getShortLinkConnReq' nm user sLnk
groupSLinkData_ <- liftIO $ decodeLinkUserData cData
gInfo' <- case groupSLinkData_ of
Just sLinkData -> fst <$> updateGroupFromLinkData user gInfo sLinkData
_ -> pure gInfo
when (memberRole' (membership gInfo) /= GROwner && memberCurrent (membership gInfo)) $
withGroupLock "syncSubscriberRelays" groupId $
syncSubscriberRelays user gInfo' currentRelayLinks
pure $ CRGroupInfo user gInfo'
_ -> throwCmdError "group link data not available"
APIGroupMemberInfo gId gMemberId -> withUser $ \user -> do
@@ -2135,7 +2138,7 @@ processChatCommand vr nm = \case
_ -> Nothing
void $ createLinkOwnerMember db vr user gInfo' ctId_ (MemberId ownerId) ownerKey
pure gInfo'
rs <- mapConcurrently (connectToRelay gInfo') relays
rs <- mapConcurrently (connectToRelay user gInfo') relays
let relayFailed = \case (_, _, Left _) -> True; _ -> False
(failed, succeeded) = partition relayFailed rs
if null succeeded
@@ -2162,23 +2165,6 @@ processChatCommand vr nm = \case
isTempErr = \case
(_, _, Left ChatErrorAgent {agentError = e}) -> temporaryOrHostError e
_ -> False
connectToRelay gInfo' relayLink = do
gVar <- asks random
-- Save relayLink to re-use relay member record on retry (check by relayLink)
relayMember <- withFastStore $ \db -> getCreateRelayForMember db vr gVar user gInfo' relayLink
r <- tryAllErrors $ do
(fd@FixedLinkData {rootKey = relayKey, linkEntityId}, cData) <- getShortLinkConnReq nm user relayLink
relayLinkData_ <- liftIO $ decodeLinkUserData cData
case (relayLinkData_, linkEntityId) of
(Just RelayShortLinkData {relayProfile = p}, Just entityId) ->
withFastStore $ \db -> updateRelayMemberData db user relayMember (MemberId entityId) (MemberKey relayKey) p
_ -> throwChatError $ CEException "relay link: no relay link data or entity id"
let cReq = linkConnReq fd
relayLinkToConnect = CCLink cReq (Just relayLink)
void $ connectViaContact user (Just $ PCEGroup gInfo' relayMember) incognito relayLinkToConnect Nothing Nothing
-- Re-read member to get updated activeConn and updated data (from updateRelayMemberData)
relayMember' <- withFastStore $ \db -> getGroupMember db vr user groupId (groupMemberId' relayMember)
pure (relayLink, relayMember', r)
retryRelayConnectionAsync gInfo' relayLink relayMember@GroupMember {activeConn} = do
forM_ activeConn $ \conn -> do
deleteAgentConnectionAsync $ aConnId conn
@@ -2547,6 +2533,37 @@ processChatCommand vr nm = \case
relays <- liftIO $ getGroupRelays db gInfo
pure (gInfo, relays)
pure $ CRGroupRelays user gInfo relays
APIAddGroupRelays groupId relayIds -> withUser $ \user -> withGroupLock "addGroupRelays" groupId $ do
(gInfo, existingRelays) <- withFastStore $ \db -> do
gi <- getGroupInfo db vr user groupId
rs <- liftIO $ getGroupRelays db gi
pure (gi, rs)
assertUserGroupRole gInfo GROwner
unless (useRelays' gInfo) $ throwCmdError "group does not use relays"
let existingRelayIds = map (\GroupRelay {userChatRelay = UserChatRelay {chatRelayId = DBEntityId rId}} -> rId) existingRelays
when (any (`elem` existingRelayIds) relayIds) $ throwCmdError "some relays are already in the group"
gLink@GroupLink {connLinkContact = ccLink} <- withFastStore $ \db -> getGroupLink db user gInfo
sLnk <- case connShortLink' ccLink of
Just sl -> pure sl
Nothing -> throwChatError $ CEException "group link has no short link"
relays <- withFastStore $ \db -> mapM (getChatRelayById db user) (L.toList relayIds)
results <- addRelays user gInfo sLnk relays
case partitionEithers (map snd results) of
([], _) -> do
relays' <- withFastStore $ \db -> liftIO $ getGroupRelays db gInfo
pure $ CRGroupRelaysAdded user gInfo gLink relays'
(errors@(e : _), _) -> do
if all isTempErr errors
then throwError e
else do
let toRelayResult (r, Left e') = AddRelayResult r (Just e')
toRelayResult (r, Right _) = AddRelayResult r Nothing
pure $ CRGroupRelaysAddFailed user (map toRelayResult results)
where
isTempErr :: ChatError -> Bool
isTempErr = \case
ChatErrorAgent {agentError = e} -> temporaryOrHostError e
_ -> False
APIAddMember groupId contactId memRole -> withUser $ \user -> withGroupLock "addMember" groupId $ do
-- TODO for large groups: no need to load all members to determine if contact is a member
(group, contact) <- withFastStore $ \db -> (,) <$> getGroup db vr user groupId <*> getContact db vr user contactId
@@ -3577,6 +3594,44 @@ processChatCommand vr nm = \case
ct' <- withStore $ \db -> getContact db vr user contactId
pure $ CRSentInvitationToContact user ct' incognitoProfile
_ -> throwCmdError "contact already has connection"
connectToRelay :: User -> GroupInfo -> ShortLinkContact -> CM (ShortLinkContact, GroupMember, Either ChatError ())
connectToRelay user gInfo relayLink = do
gVar <- asks random
-- Save relayLink to re-use relay member record on retry (check by relayLink)
relayMember <- withFastStore $ \db -> getCreateRelayForMember db vr gVar user gInfo relayLink
r <- tryAllErrors $ do
(fd@FixedLinkData {rootKey = relayKey, linkEntityId}, cData) <- getShortLinkConnReq nm user relayLink
relayLinkData_ <- liftIO $ decodeLinkUserData cData
case (relayLinkData_, linkEntityId) of
(Just RelayShortLinkData {relayProfile = p}, Just entityId) ->
withFastStore $ \db -> updateRelayMemberData db user relayMember (MemberId entityId) (MemberKey relayKey) p
_ -> throwChatError $ CEException "relay link: no relay link data or entity id"
let cReq = linkConnReq fd
relayLinkToConnect = CCLink cReq (Just relayLink)
void $ connectViaContact user (Just $ PCEGroup gInfo relayMember) (incognitoMembership gInfo) relayLinkToConnect Nothing Nothing
relayMember' <- withFastStore $ \db -> getGroupMember db vr user (groupId' gInfo) (groupMemberId' relayMember)
pure (relayLink, relayMember', r)
syncSubscriberRelays :: User -> GroupInfo -> [ShortLinkContact] -> CM ()
syncSubscriberRelays user gInfo currentRelayLinks = void . tryAllErrors $ do
localRelayMembers <- withFastStore' $ \db -> getGroupRelayMembers db vr user gInfo
let activeRelayMembers = filter memberCurrent localRelayMembers
memberRelayLink GroupMember {relayLink = rl} = rl
localRelayLinks = mapMaybe memberRelayLink activeRelayMembers
newRelayLinks = filter (`notElem` localRelayLinks) currentRelayLinks
forM_ newRelayLinks $ \rlnk -> void . tryAllErrors $
connectToRelay user gInfo rlnk
forM_ localRelayMembers $ \m ->
case memberRelayLink m of
-- Remove relay if its link is no longer in the current link data.
-- Inactive relays (e.g. left) are only cleaned up when no active relays remain,
-- as that is the only case where the owner's relay removal can't be forwarded.
Just rlnk | rlnk `notElem` currentRelayLinks,
memberCurrent m || null activeRelayMembers ->
void . tryAllErrors $ do
deleteMemberConnection m
deleteOrUpdateMemberRecord user gInfo m
_ -> pure ()
prepareContact :: User -> ConnReqContact -> PQSupport -> CM (ConnId, VersionChat)
prepareContact user cReq pqSup = do
-- 0) toggle disabled - PQSupportOff
@@ -4727,12 +4782,42 @@ deleteInProgressGroup user gInfo = do
withFastStore' $ \db -> deleteGroup db user gInfo
runRelayGroupLinkChecks :: User -> CM ()
runRelayGroupLinkChecks _user = do
-- TODO [relays] relay: periodically check presence of relay link in group links of served groups
-- TODO - retrieve group link data
-- TODO - if relay link is present, update relay status to RSActive
-- TODO - if relay link is absent and status was RSActive -> update to new "Removed" status?
pure ()
runRelayGroupLinkChecks user = do
initialDelay <- asks (relayChecksInitialDelay . config)
liftIO $ threadDelay' initialDelay
interval <- asks (relayChecksInterval . config)
forever $ do
flip catchAllErrors eToView $ do
lift waitChatStartedAndActivated
checkRelayServedGroups
checkRelayInactiveGroups
liftIO $ threadDelay' $ diffToMicroseconds interval
where
checkRelayServedGroups = do
vr <- chatVersionRange
relayGroups <- withStore' $ \db -> getRelayServedGroups db vr user
forM_ relayGroups $ \gInfo@GroupInfo {groupProfile = gp} -> flip catchAllErrors eToView $ do
case publicGroup gp of
Just PublicGroupProfile {groupLink = sLnk} -> do
(_, ContactLinkData _ UserContactData {relays = relayLinks}) <-
getShortLinkConnReq' NRMBackground user sLnk
gLink_ <- withStore' $ \db -> runExceptT $ getGroupLink db user gInfo
case gLink_ of
Right GroupLink {connLinkContact = CCLink _ (Just ourLink)} ->
if ourLink `elem` relayLinks
then do
-- TODO [relays] emit event to UI when relay own status promoted to RSActive
-- CEvtGroupRelayUpdated requires GroupRelay (owner-side), not available on relay side
void $ withStore' $ \db -> updateRelayOwnStatusFromTo db gInfo RSAccepted RSActive
else void $ withStore' $ \db -> updateRelayOwnStatusFromTo db gInfo RSActive RSInactive
_ -> pure ()
_ -> pure ()
checkRelayInactiveGroups = do
vr <- chatVersionRange
ttl <- asks (relayInactiveTTL . config)
inactiveGroups <- withStore' $ \db -> getRelayInactiveGroups db vr user ttl
forM_ inactiveGroups $ \gInfo -> flip catchAllErrors eToView $
deleteGroupConnections user gInfo False
expireChatItems :: User -> Int64 -> Bool -> CM ()
expireChatItems user@User {userId} globalTTL sync = do
@@ -5026,6 +5111,7 @@ chatCommandP =
("/public group" <|> "/pg") *> (NewPublicGroup <$> incognitoP <* " relays=" <*> strP <* A.space <* char_ '#' <*> channelProfile),
"/_public group " *> (APINewPublicGroup <$> A.decimal <*> incognitoOnOffP <*> _strP <* A.space <*> jsonP),
"/_get relays #" *> (APIGetGroupRelays <$> A.decimal),
"/_add relays #" *> (APIAddGroupRelays <$> A.decimal <*> _strP),
("/add " <|> "/a ") *> char_ '#' *> (AddMember <$> displayNameP <* A.space <* char_ '@' <*> displayNameP <*> (memberRole <|> pure GRMember)),
("/join " <|> "/j ") *> char_ '#' *> (JoinGroup <$> displayNameP <*> (" mute" $> MFNone <|> pure MFAll)),
"/accept member " *> char_ '#' *> (AcceptMember <$> displayNameP <* A.space <* char_ '@' <*> displayNameP <*> (memberRole <|> pure GRMember)),
+13 -3
View File
@@ -1808,9 +1808,12 @@ deleteOrUpdateMemberRecord user gInfo m =
deleteOrUpdateMemberRecordIO :: DB.Connection -> User -> GroupInfo -> GroupMember -> IO GroupInfo
deleteOrUpdateMemberRecordIO db user@User {userId} gInfo m = do
(gInfo', m') <- deleteSupportChatIfExists db user gInfo m
checkGroupMemberHasItems db user m' >>= \case
Just _ -> updateGroupMemberStatus db userId m' GSMemRemoved
Nothing -> deleteGroupMember db user m'
if isRelay m'
then deleteGroupMember db user m'
else
checkGroupMemberHasItems db user m' >>= \case
Just _ -> updateGroupMemberStatus db userId m' GSMemRemoved
Nothing -> deleteGroupMember db user m'
pure gInfo'
updateMemberRecordDeleted :: User -> GroupInfo -> GroupMember -> GroupMemberStatus -> CM GroupInfo
@@ -1818,8 +1821,15 @@ updateMemberRecordDeleted user@User {userId} gInfo m newStatus =
withStore' $ \db -> do
(gInfo', m') <- deleteSupportChatIfExists db user gInfo m
updateGroupMemberStatus db userId m' newStatus
deactivateRelay_ db m
pure gInfo'
deactivateRelay_ :: DB.Connection -> GroupMember -> IO ()
deactivateRelay_ db m =
when (isRelay m) $ do
relay_ <- runExceptT $ getGroupRelayByGMId db (groupMemberId' m)
forM_ relay_ $ \relay -> void $ updateRelayStatus db relay RSInactive
deleteSupportChatIfExists :: DB.Connection -> User -> GroupInfo -> GroupMember -> IO (GroupInfo, GroupMember)
deleteSupportChatIfExists db user gInfo m = do
gInfo' <-
+49 -23
View File
@@ -931,7 +931,11 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
newDeliveryTasks <- reverse <$> foldM (processAChatMsg gInfo' scopeInfo m' tags eInfo) [] aChatMsgs
shouldDelConns <-
if isUserGrpFwdRelay gInfo' && not (blockedByAdmin m)
then createDeliveryTasks gInfo' m' newDeliveryTasks
then
let tasks
| relayOwnStatus gInfo' == Just RSInactive = filter relayRemovedNewTask newDeliveryTasks
| otherwise = newDeliveryTasks
in createDeliveryTasks gInfo' m' tasks
else pure False
withRcpt <- checkSendRcpt $ rights aChatMsgs
pure (withRcpt, shouldDelConns)
@@ -1039,6 +1043,8 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
where
aChatMsgHasReceipt (APMsg _ (ParsedMsg _ _ ChatMessage {chatMsgEvent})) =
hasDeliveryReceipt (toCMEventTag chatMsgEvent)
relayRemovedNewTask :: NewMessageDeliveryTask -> Bool
relayRemovedNewTask NewMessageDeliveryTask {taskContext = DeliveryTaskContext {jobScope}} = isRelayRemoved jobScope
createDeliveryTasks :: GroupInfo -> GroupMember -> [NewMessageDeliveryTask] -> CM ShouldDeleteGroupConns
createDeliveryTasks gInfo'@GroupInfo {groupId = gId} m' newDeliveryTasks = do
let relayRemovedTask_ = find (\NewMessageDeliveryTask {taskContext = DeliveryTaskContext {jobScope}} -> isRelayRemoved jobScope) newDeliveryTasks
@@ -1306,8 +1312,6 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
pure (gInfo, gLink, relays', changed)
toView $ CEvtGroupLinkDataUpdated user gInfo gLink relays relaysChanged
where
-- TODO [relays] owner: on relay deletion (link absent from relayLinks)
-- TODO move status RSActive to new "Removed" status / remove relay record
updateRelay :: DB.Connection -> GroupRelay -> ([GroupRelay], Bool) -> IO ([GroupRelay], Bool)
updateRelay db relay@GroupRelay {relayLink, relayStatus} (acc, changed) =
case relayLink of
@@ -1315,6 +1319,16 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
| rLink `elem` relayLinks && relayStatus == RSAccepted -> do
relay' <- updateRelayStatus db relay RSActive
pure (relay' : acc, True)
| rLink `elem` relayLinks -> pure (relay : acc, changed)
| relayStatus == RSActive -> do
-- Relay link absent from link data — deactivate.
-- RSAccepted relays are not deactivated: their own link data update
-- may not have been processed yet (race with concurrent relay connections).
-- TODO [relays] multi-owner: Another owner removing a relay updates link data on
-- TODO the SMP server, but this owner won't receive a LINK callback for it
-- TODO (LINK only fires in response to own setConnShortLink calls).
relay' <- updateRelayStatus db relay RSInactive
pure (relay' : acc, True)
_ -> pure (relay : acc, changed)
_ -> throwChatError $ CECommandError "LINK event expected for a group link only"
_ -> throwChatError $ CECommandError "unexpected cmdFunction"
@@ -3096,10 +3110,12 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
deleteGroupLinkIfExists user gInfo
-- TODO [relays] possible improvement is to immediately delete rcv queues if isUserGrpFwdRelay
unless (isUserGrpFwdRelay gInfo) $ deleteGroupConnections user gInfo False
withStore' $ \db -> updateGroupMemberStatus db userId membership GSMemRemoved
withStore' $ \db -> do
updateGroupMemberStatus db userId membership GSMemRemoved
when (isJust $ relayOwnStatus gInfo) $ updateRelayOwnStatus_ db gInfo RSInactive
let membership' = membership {memberStatus = GSMemRemoved}
when withMessages $ deleteMessages gInfo membership' SMDSnd
deleteMemberItem gInfo RGEUserDeleted
deleteMemberItem msg gInfo RGEUserDeleted
toView $ CEvtDeletedMemberUser user gInfo {membership = membership'} m withMessages msgSigned
pure $ Just DJSGroup {jobSpec = DJRelayRemoved}
else
@@ -3127,7 +3143,11 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
let wasDeleted = memberStatus == GSMemRemoved || memberStatus == GSMemLeft
deletedMember' = deletedMember {memberStatus = GSMemRemoved}
when withMessages $ deleteMessages gInfo'' deletedMember' SMDRcv
unless wasDeleted $ deleteMemberItem gInfo'' $ RGEMemberDeleted groupMemberId (fromLocalProfile memberProfile)
-- Clear forwardedByMember if it references the deleted member,
-- as the member record was already deleted above.
let RcvMessage {forwardedByMember = fwdBy} = msg
msg' = if fwdBy == Just groupMemberId then (msg :: RcvMessage) {forwardedByMember = Nothing} else msg
unless wasDeleted $ deleteMemberItem msg' gInfo'' $ RGEMemberDeleted groupMemberId (fromLocalProfile memberProfile)
toView $ CEvtDeletedMember user gInfo'' m deletedMember' withMessages msgSigned
pure deliveryScope
where
@@ -3135,9 +3155,9 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
| senderRole < GRAdmin || senderRole < memberRole =
messageError "x.grp.mem.del with insufficient member permissions" $> Nothing
| otherwise = a
deleteMemberItem gi gEvent = do
deleteMemberItem msg' gi gEvent = do
(gi', m', scopeInfo) <- mkGroupChatScope gi m
(ci, cInfo) <- saveRcvChatItemNoParse user (CDGroupRcv gi' scopeInfo m') msg brokerTs (CIRcvGroupEvent gEvent)
(ci, cInfo) <- saveRcvChatItemNoParse user (CDGroupRcv gi' scopeInfo m') msg' brokerTs (CIRcvGroupEvent gEvent)
groupMsgToView cInfo ci
deleteMessages :: MsgDirectionI d => GroupInfo -> GroupMember -> SMsgDirection d -> CM ()
deleteMessages gInfo' delMem msgDir
@@ -3168,10 +3188,6 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
deleteMemberConnection m
-- member record is not deleted to allow creation of "member left" chat item
gInfo' <- updateMemberRecordDeleted user gInfo m GSMemLeft
when (isRelay m) $
withStore' $ \db -> do
relay_ <- runExceptT $ getGroupRelayByGMId db (groupMemberId' m)
forM_ relay_ $ \relay -> void $ updateRelayStatus db relay RSInactive
gInfo'' <- updatePublicGroupData user gInfo'
unless (muteEventInChannel gInfo'' m) $ do
(gInfo''', m', scopeInfo) <- mkGroupChatScope gInfo'' m
@@ -3526,19 +3542,24 @@ runDeliveryTaskWorker a deliveryKey Worker {doWork} = do
processDeliveryTask :: MessageDeliveryTask -> CM ()
processDeliveryTask task@MessageDeliveryTask {jobScope} =
case jobScopeImpliedSpec jobScope of
DJDeliveryJob _includePending ->
withWorkItems a doWork (withStore' $ \db -> getNextDeliveryTasks db gInfo task) $ \nextTasks -> do
let (body, taskIds, largeTaskIds) = batchDeliveryTasks1 vr maxEncodedMsgLength nextTasks
withStore' $ \db -> do
createMsgDeliveryJob db gInfo jobScope (singleSenderGMId_ nextTasks) body
forM_ taskIds $ \taskId -> updateDeliveryTaskStatus db taskId DTSProcessed
forM_ largeTaskIds $ \taskId -> setDeliveryTaskErrStatus db taskId "large"
lift . void $ getDeliveryJobWorker True deliveryKey
DJDeliveryJob _includePending
| relayOwnStatus gInfo == Just RSInactive -> do
logWarn "delivery task worker: relay inactive"
withStore' $ \db -> setDeliveryTaskErrStatus db (deliveryTaskId task) "relay inactive"
| otherwise ->
withWorkItems a doWork (withStore' $ \db -> getNextDeliveryTasks db gInfo task) $ \nextTasks -> do
let (body, taskIds, largeTaskIds) = batchDeliveryTasks1 vr maxEncodedMsgLength nextTasks
withStore' $ \db -> do
createMsgDeliveryJob db gInfo jobScope (singleSenderGMId_ nextTasks) body
forM_ taskIds $ \taskId -> updateDeliveryTaskStatus db taskId DTSProcessed
forM_ largeTaskIds $ \taskId -> setDeliveryTaskErrStatus db taskId "large"
lift . void $ getDeliveryJobWorker True deliveryKey
where
singleSenderGMId_ :: NonEmpty MessageDeliveryTask -> Maybe GroupMemberId
singleSenderGMId_ (MessageDeliveryTask {senderGMId = senderGMId'} :| ts)
| all (\MessageDeliveryTask {senderGMId} -> senderGMId == senderGMId') ts = Just senderGMId'
| otherwise = Nothing
-- DJRelayRemoved is allowed when RSInactive - it forwards XGrpMemDel about relay's own deletion
DJRelayRemoved
| workerScope /= DWSGroup ->
throwChatError $ CEInternalError "delivery task worker: relay removed task in wrong worker scope"
@@ -3591,9 +3612,14 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do
processDeliveryJob :: MessageDeliveryJob -> CM ()
processDeliveryJob job =
case jobScopeImpliedSpec jobScope of
DJDeliveryJob _includePending -> do
sendBodyToMembers
withStore' $ \db -> updateDeliveryJobStatus db jobId DJSComplete
DJDeliveryJob _includePending
| relayOwnStatus gInfo == Just RSInactive -> do
logWarn "delivery job worker: relay inactive"
withStore' $ \db -> setDeliveryJobErrStatus db (deliveryJobId job) "relay inactive"
| otherwise -> do
sendBodyToMembers
withStore' $ \db -> updateDeliveryJobStatus db jobId DJSComplete
-- DJRelayRemoved is allowed when RSInactive - it forwards XGrpMemDel about relay's own deletion
DJRelayRemoved
| workerScope /= DWSGroup ->
throwChatError $ CEInternalError "delivery job worker: relay removed job in wrong worker scope"