mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2026-05-24 10:55:33 +00:00
Merge branch 'master' into chat-relays
This commit is contained in:
@@ -114,6 +114,7 @@ defaultChatConfig =
|
||||
deliveryWorkerDelay = 0,
|
||||
deliveryBucketSize = 10000,
|
||||
deviceNameForRemote = "",
|
||||
remoteCompression = True,
|
||||
chatHooks = defaultChatHooks
|
||||
}
|
||||
|
||||
|
||||
@@ -96,7 +96,12 @@ import Simplex.RemoteControl.Types
|
||||
import System.IO (Handle)
|
||||
import System.Mem.Weak (Weak)
|
||||
import UnliftIO.STM
|
||||
#if !defined(dbPostgres)
|
||||
|
||||
#if defined(dbPostgres)
|
||||
import qualified Database.PostgreSQL.Simple as PSQL
|
||||
|
||||
type SQLError = PSQL.SqlError
|
||||
#else
|
||||
import Database.SQLite.Simple (SQLError)
|
||||
import qualified Database.SQLite.Simple as SQL
|
||||
import Simplex.Messaging.Agent.Store.SQLite.DB (SlowQueryStats (..))
|
||||
@@ -159,6 +164,7 @@ data ChatConfig = ChatConfig
|
||||
deliveryBucketSize :: Int,
|
||||
highlyAvailable :: Bool,
|
||||
deviceNameForRemote :: Text,
|
||||
remoteCompression :: Bool,
|
||||
chatHooks :: ChatHooks
|
||||
}
|
||||
|
||||
@@ -759,7 +765,7 @@ data ChatResponse
|
||||
| CRRemoteFileStored {remoteHostId :: RemoteHostId, remoteFileSource :: CryptoFile}
|
||||
| CRRemoteCtrlList {remoteCtrls :: [RemoteCtrlInfo]}
|
||||
| CRRemoteCtrlConnecting {remoteCtrl_ :: Maybe RemoteCtrlInfo, ctrlAppInfo :: CtrlAppInfo, appVersion :: AppVersion}
|
||||
| CRRemoteCtrlConnected {remoteCtrl :: RemoteCtrlInfo}
|
||||
| CRRemoteCtrlConnected {remoteCtrl :: RemoteCtrlInfo, compression :: Bool}
|
||||
| CRSQLResult {rows :: [Text]}
|
||||
#if !defined(dbPostgres)
|
||||
| CRArchiveExported {archiveErrors :: [ArchiveError]}
|
||||
@@ -862,7 +868,7 @@ data ChatEvent
|
||||
| CEvtNtfMessage {user :: User, connEntity :: ConnectionEntity, ntfMessage :: NtfMsgAckInfo}
|
||||
| CEvtRemoteHostSessionCode {remoteHost_ :: Maybe RemoteHostInfo, sessionCode :: Text}
|
||||
| CEvtNewRemoteHost {remoteHost :: RemoteHostInfo}
|
||||
| CEvtRemoteHostConnected {remoteHost :: RemoteHostInfo}
|
||||
| CEvtRemoteHostConnected {remoteHost :: RemoteHostInfo, compression :: Bool}
|
||||
| CEvtRemoteHostStopped {remoteHostId_ :: Maybe RemoteHostId, rhsState :: RemoteHostSessionState, rhStopReason :: RemoteHostStopReason}
|
||||
| CEvtRemoteCtrlFound {remoteCtrl :: RemoteCtrlInfo, ctrlAppInfo_ :: Maybe CtrlAppInfo, appVersion :: AppVersion, compatible :: Bool}
|
||||
| CEvtRemoteCtrlSessionCode {remoteCtrl_ :: Maybe RemoteCtrlInfo, sessionCode :: Text}
|
||||
@@ -902,7 +908,7 @@ allowRemoteEvent = \case
|
||||
CEvtChatSuspended -> False
|
||||
CEvtRemoteHostSessionCode {} -> False
|
||||
CEvtNewRemoteHost _ -> False
|
||||
CEvtRemoteHostConnected _ -> False
|
||||
CEvtRemoteHostConnected {} -> False
|
||||
CEvtRemoteHostStopped {} -> False
|
||||
CEvtRemoteCtrlFound {} -> False
|
||||
CEvtRemoteCtrlSessionCode {} -> False
|
||||
@@ -1403,7 +1409,8 @@ data RemoteCtrlSession
|
||||
| RCSessionConnecting
|
||||
{ remoteCtrlId_ :: Maybe RemoteCtrlId,
|
||||
rcsClient :: RCCtrlClient,
|
||||
rcsWaitSession :: Async ()
|
||||
rcsWaitSession :: Async (),
|
||||
ctrlAppInfo :: CtrlAppInfo
|
||||
}
|
||||
| RCSessionPendingConfirmation
|
||||
{ remoteCtrlId_ :: Maybe RemoteCtrlId,
|
||||
@@ -1412,7 +1419,8 @@ data RemoteCtrlSession
|
||||
tls :: TLS 'TClient,
|
||||
sessionCode :: Text,
|
||||
rcsWaitSession :: Async (),
|
||||
rcsWaitConfirmation :: TMVar (Either RCErrorType (RCCtrlSession, RCCtrlPairing))
|
||||
rcsWaitConfirmation :: TMVar (Either RCErrorType (RCCtrlSession, RCCtrlPairing)),
|
||||
ctrlAppInfo :: CtrlAppInfo
|
||||
}
|
||||
| RCSessionConnected
|
||||
{ remoteCtrlId :: RemoteCtrlId,
|
||||
@@ -1420,7 +1428,8 @@ data RemoteCtrlSession
|
||||
tls :: TLS 'TClient,
|
||||
rcsSession :: RCCtrlSession,
|
||||
http2Server :: Async (),
|
||||
remoteOutputQ :: TBQueue (Either ChatError ChatEvent)
|
||||
remoteOutputQ :: TBQueue (Either ChatError ChatEvent),
|
||||
ctrlAppInfo :: CtrlAppInfo
|
||||
}
|
||||
|
||||
data RemoteCtrlSessionState
|
||||
@@ -1544,25 +1553,24 @@ withFastStore = withStorePriority True
|
||||
withStorePriority :: Bool -> (DB.Connection -> ExceptT StoreError IO a) -> CM a
|
||||
withStorePriority priority action = do
|
||||
ChatController {chatStore} <- ask
|
||||
liftIOEither $ withTransactionPriority chatStore priority (runExceptT . withExceptT ChatErrorStore . action) `E.catches` handleDBErrors
|
||||
liftIOEither $ withTransactionPriority chatStore priority (runExceptT . withExceptT ChatErrorStore . action) `E.catch` handleDBErrors
|
||||
|
||||
withStoreBatch :: Traversable t => (DB.Connection -> t (IO (Either ChatError a))) -> CM' (t (Either ChatError a))
|
||||
withStoreBatch actions = do
|
||||
ChatController {chatStore} <- ask
|
||||
liftIO $ withTransaction chatStore $ mapM (`E.catches` handleDBErrors) . actions
|
||||
liftIO $ withTransaction chatStore $ mapM (`E.catch` handleDBErrors) . actions
|
||||
|
||||
-- TODO [postgres] postgres specific error handling
|
||||
handleDBErrors :: [E.Handler (Either ChatError a)]
|
||||
handleDBErrors =
|
||||
#if !defined(dbPostgres)
|
||||
( E.Handler $ \(e :: SQLError) ->
|
||||
let se = SQL.sqlError e
|
||||
busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked
|
||||
in pure . Left . ChatErrorStore $ if busy then SEDBBusyError $ show se else SEDBException $ show e
|
||||
) :
|
||||
handleDBErrors :: E.SomeException -> IO (Either ChatError a)
|
||||
handleDBErrors e = pure $ Left $ ChatErrorStore $ case E.fromException e of
|
||||
Just (e' :: SQLError) ->
|
||||
#if defined(dbPostgres)
|
||||
SEDBException $ show e'
|
||||
#else
|
||||
let se = SQL.sqlError e'
|
||||
busy = se == SQL.ErrorBusy || se == SQL.ErrorLocked
|
||||
in (if busy then SEDBBusyError else SEDBException) $ show e'
|
||||
#endif
|
||||
[ E.Handler $ \(E.SomeException e) -> pure . Left . ChatErrorStore . SEDBException $ show e
|
||||
]
|
||||
Nothing -> SEDBException $ show e
|
||||
|
||||
withStoreBatch' :: Traversable t => (DB.Connection -> t (IO a)) -> CM' (t (Either ChatError a))
|
||||
withStoreBatch' actions = withStoreBatch $ fmap (fmap Right) . actions
|
||||
|
||||
@@ -167,9 +167,6 @@ startChatController mainApp enableSndFiles = do
|
||||
runExceptT (syncConnections' users) >>= \case
|
||||
Left e -> liftIO $ putStrLn $ "Error synchronizing connections: " <> show e
|
||||
Right _ -> pure ()
|
||||
runExceptT migrateMemberRelations >>= \case
|
||||
Left e -> liftIO $ putStrLn $ "Error migrating member relations: " <> show e
|
||||
Right _ -> pure ()
|
||||
restoreCalls
|
||||
s <- asks agentAsync
|
||||
readTVarIO s >>= maybe (start s users) (pure . fst)
|
||||
@@ -181,10 +178,6 @@ startChatController mainApp enableSndFiles = do
|
||||
(userDiff, connDiff) <- withAgent (\a -> syncConnections a aUserIds connIds)
|
||||
withFastStore' setConnectionsSyncTs
|
||||
toView $ CEvtConnectionsDiff (AgentUserId <$> userDiff) (AgentConnId <$> connDiff)
|
||||
migrateMemberRelations =
|
||||
when mainApp $
|
||||
whenM (withStore' hasMembersWithoutVector) $
|
||||
void $ forkIO runRelationsVectorMigration
|
||||
start s users = do
|
||||
a1 <- async agentSubscriber
|
||||
a2 <-
|
||||
@@ -269,7 +262,7 @@ stopChatController ChatController {smpAgent, agentAsync = s, sndFiles, rcvFiles,
|
||||
readTVarIO remoteHostSessions >>= mapM_ (cancelRemoteHost False . snd)
|
||||
atomically (stateTVar remoteCtrlSession (,Nothing)) >>= mapM_ (cancelRemoteCtrl False . snd)
|
||||
disconnectAgentClient smpAgent
|
||||
readTVarIO s >>= mapM_ (\(a1, a2) -> uninterruptibleCancel a1 >> mapM_ uninterruptibleCancel a2)
|
||||
readTVarIO s >>= mapM_ (\(a1, a2) -> forkIO $ uninterruptibleCancel a1 >> mapM_ uninterruptibleCancel a2)
|
||||
closeFiles sndFiles
|
||||
closeFiles rcvFiles
|
||||
atomically $ do
|
||||
@@ -1837,7 +1830,7 @@ processChatCommand vr nm = \case
|
||||
conn <- withFastStore $ \db -> getPendingContactConnection db userId connId
|
||||
let PendingContactConnection {pccConnStatus, connLinkInv} = conn
|
||||
case (pccConnStatus, connLinkInv) of
|
||||
(ConnNew, Just _ссLink) -> do
|
||||
(ConnNew, Just _ccLink) -> do
|
||||
newUser <- privateGetUser newUserId
|
||||
conn' <- recreateConn user conn newUser
|
||||
pure $ CRConnectionUserChanged user conn conn' newUser
|
||||
@@ -2995,7 +2988,7 @@ processChatCommand vr nm = \case
|
||||
ConfirmRemoteCtrl rcId -> withUser_ $ do
|
||||
(rc, ctrlAppInfo) <- confirmRemoteCtrl rcId
|
||||
pure CRRemoteCtrlConnecting {remoteCtrl_ = Just rc, ctrlAppInfo, appVersion = currentAppVersion}
|
||||
VerifyRemoteCtrlSession sessId -> withUser_ $ CRRemoteCtrlConnected <$> verifyRemoteCtrlSession (execChatCommand Nothing) sessId
|
||||
VerifyRemoteCtrlSession sessId -> withUser_ $ verifyRemoteCtrlSession (execChatCommand Nothing) sessId
|
||||
StopRemoteCtrl -> withUser_ $ stopRemoteCtrl >> ok_
|
||||
ListRemoteCtrls -> withUser_ $ CRRemoteCtrlList <$> listRemoteCtrls
|
||||
DeleteRemoteCtrl rc -> withUser_ $ deleteRemoteCtrl rc >> ok_
|
||||
@@ -4220,21 +4213,6 @@ agentSubscriber = do
|
||||
|
||||
type AgentSubResult = Map ConnId (Either AgentErrorType (Maybe ClientServiceId))
|
||||
|
||||
runRelationsVectorMigration :: CM ()
|
||||
runRelationsVectorMigration = do
|
||||
liftIO $ threadDelay' 5000000 -- 5 seconds (initial delay)
|
||||
migrateMembers
|
||||
where
|
||||
stepDelay = 1000000 -- 1 second
|
||||
migrateMembers = flip catchAllErrors eToView $ do
|
||||
lift waitChatStartedAndActivated
|
||||
gmIds <- withStore' getGMsWithoutVectorIds
|
||||
forM_ gmIds $ \gmId -> do
|
||||
lift waitChatStartedAndActivated
|
||||
withStore' (`migrateMemberRelationsVector'` gmId) `catchAllErrors` eToView
|
||||
liftIO $ threadDelay' stepDelay
|
||||
unless (null gmIds) migrateMembers
|
||||
|
||||
cleanupManager :: CM ()
|
||||
cleanupManager = do
|
||||
interval <- asks (cleanupManagerInterval . config)
|
||||
|
||||
@@ -1030,11 +1030,11 @@ introduceToModerators vr user gInfo@GroupInfo {groupId} m@GroupMember {memberRol
|
||||
else XMsgNew $ MCSimple $ extMsgContent (MCText pendingReviewMessage) Nothing
|
||||
void $ sendDirectMemberMessage mConn msg groupId
|
||||
modMs <- withStore' $ \db -> getGroupModerators db vr user gInfo
|
||||
let rcpModMs = filter shouldIntroduce modMs
|
||||
introduceMember vr user gInfo m rcpModMs (Just $ MSMember $ memberId' m)
|
||||
let rcpModMs = filter shouldIntroduceToMod modMs
|
||||
introduceMember user gInfo m rcpModMs (Just $ MSMember $ memberId' m)
|
||||
where
|
||||
shouldIntroduce :: GroupMember -> Bool
|
||||
shouldIntroduce mem =
|
||||
shouldIntroduceToMod :: GroupMember -> Bool
|
||||
shouldIntroduceToMod mem =
|
||||
memberCurrent mem
|
||||
&& groupMemberId' mem /= groupMemberId' m
|
||||
&& maxVersion (memberChatVRange mem) >= groupKnockingVersion
|
||||
@@ -1042,42 +1042,33 @@ introduceToModerators vr user gInfo@GroupInfo {groupId} m@GroupMember {memberRol
|
||||
introduceToAll :: VersionRangeChat -> User -> GroupInfo -> GroupMember -> CM ()
|
||||
introduceToAll vr user gInfo m = do
|
||||
members <- withStore' $ \db -> getGroupMembers db vr user gInfo
|
||||
vector_ <- withStore' (`getMemberRelationsVector_` m)
|
||||
let recipients = filter (shouldIntroduce vector_) members
|
||||
introduceMember vr user gInfo m recipients Nothing
|
||||
where
|
||||
shouldIntroduce :: Maybe ByteString -> GroupMember -> Bool
|
||||
shouldIntroduce vector_ m' =
|
||||
memberCurrent m'
|
||||
&& groupMemberId' m' /= groupMemberId' m
|
||||
&& maybe True (\v -> getRelation (indexInGroup m') v == MRNew) vector_
|
||||
vector <- withStore (`getMemberRelationsVector` m)
|
||||
let recipients = filter (shouldIntroduce m vector) members
|
||||
introduceMember user gInfo m recipients Nothing
|
||||
|
||||
introduceToRemaining :: VersionRangeChat -> User -> GroupInfo -> GroupMember -> CM ()
|
||||
introduceToRemaining vr user gInfo m = do
|
||||
members <- withStore' $ \db -> getGroupMembers db vr user gInfo
|
||||
vector_ <- withStore' (`getMemberRelationsVector_` m)
|
||||
recipients <- filterRecipients vector_ members
|
||||
introduceMember vr user gInfo m recipients Nothing
|
||||
where
|
||||
filterRecipients :: Maybe ByteString -> [GroupMember] -> CM [GroupMember]
|
||||
filterRecipients vector_ members = do
|
||||
newRelation <- case vector_ of
|
||||
Nothing -> do
|
||||
introducedGMIds <- S.fromList <$> withStore' (`getIntroducedGroupMemberIds` m)
|
||||
pure $ \m' -> groupMemberId' m' `S.notMember` introducedGMIds
|
||||
Just vec -> pure $ \m' -> getRelation (indexInGroup m') vec == MRNew
|
||||
pure $ filter (\m' -> groupMemberId' m' /= groupMemberId' m && memberCurrent m' && newRelation m') members
|
||||
vector <- withStore (`getMemberRelationsVector` m)
|
||||
let recipients = filter (shouldIntroduce m vector) members
|
||||
introduceMember user gInfo m recipients Nothing
|
||||
|
||||
introduceMember :: VersionRangeChat -> User -> GroupInfo -> GroupMember -> [GroupMember] -> Maybe MsgScope -> CM ()
|
||||
introduceMember _ _ _ GroupMember {activeConn = Nothing} _ _ = throwChatError $ CEInternalError "member connection not active"
|
||||
introduceMember vr user gInfo@GroupInfo {groupId} toMember@GroupMember {activeConn = Just conn} introduceToMembers msgScope = do
|
||||
shouldIntroduce :: GroupMember -> ByteString -> GroupMember -> Bool
|
||||
shouldIntroduce m vec mem =
|
||||
memberCurrent mem
|
||||
&& groupMemberId' mem /= groupMemberId' m
|
||||
&& getRelation (indexInGroup mem) vec == MRNew
|
||||
|
||||
introduceMember :: User -> GroupInfo -> GroupMember -> [GroupMember] -> Maybe MsgScope -> CM ()
|
||||
introduceMember _ _ GroupMember {activeConn = Nothing} _ _ = throwChatError $ CEInternalError "member connection not active"
|
||||
introduceMember user gInfo@GroupInfo {groupId} toMember@GroupMember {activeConn = Just conn} introduceToMembers msgScope = do
|
||||
void . sendGroupMessage' user gInfo introduceToMembers $ XGrpMemNew (memberInfo gInfo toMember) msgScope
|
||||
sendIntroductions introduceToMembers
|
||||
where
|
||||
sendIntroductions reMembers = do
|
||||
updateToMemberVector reMembers
|
||||
reMembers' <- withStore' $ \db -> createIntrosOrUpdateVectors db vr reMembers toMember
|
||||
shuffledReMembers <- liftIO $ shuffleMembers reMembers'
|
||||
updateReMembersVectors reMembers
|
||||
shuffledReMembers <- liftIO $ shuffleMembers reMembers
|
||||
if toMember `supportsVersion` batchSendVersion
|
||||
then do
|
||||
let events = map memberIntro shuffledReMembers
|
||||
@@ -1089,6 +1080,10 @@ introduceMember vr user gInfo@GroupInfo {groupId} toMember@GroupMember {activeCo
|
||||
updateToMemberVector reMembers = do
|
||||
let relations = map (\GroupMember {indexInGroup} -> (indexInGroup, (IDReferencedIntroduced, MRIntroduced))) reMembers
|
||||
withStore' $ \db -> setMemberVectorNewRelations db toMember relations
|
||||
updateReMembersVectors :: [GroupMember] -> CM ()
|
||||
updateReMembersVectors reMembers = do
|
||||
let GroupMember {indexInGroup} = toMember
|
||||
withStore' $ \db -> setMembersVectorsNewRelation db reMembers indexInGroup IDSubjectIntroduced MRIntroduced
|
||||
memberIntro :: GroupMember -> ChatMsgEvent 'Json
|
||||
memberIntro reMember =
|
||||
let mInfo = memberInfo gInfo reMember
|
||||
@@ -2026,7 +2021,7 @@ sendGroupMessages_ _user gInfo@GroupInfo {groupId} recipientMembers events = do
|
||||
pendingReq SndMessage {msgId} = (groupMemberId, msgId)
|
||||
createPendingMsg :: DB.Connection -> (GroupMemberId, MessageId) -> IO (Either ChatError ())
|
||||
createPendingMsg db (groupMemberId, msgId) =
|
||||
createPendingGroupMessage db groupMemberId msgId Nothing $> Right ()
|
||||
createPendingGroupMessage db groupMemberId msgId $> Right ()
|
||||
|
||||
data MemberSendAction = MSASend Connection | MSASendBatched Connection | MSAPending | MSAForwarded
|
||||
|
||||
@@ -2089,32 +2084,25 @@ readyMemberConn GroupMember {groupMemberId, activeConn = Just conn@Connection {c
|
||||
| otherwise = Nothing
|
||||
readyMemberConn GroupMember {activeConn = Nothing} = Nothing
|
||||
|
||||
sendGroupMemberMessage :: MsgEncodingI e => GroupInfo -> GroupMember -> ChatMsgEvent e -> Maybe GroupMemberIntro -> CM () -> CM ()
|
||||
sendGroupMemberMessage gInfo@GroupInfo {groupId} m@GroupMember {groupMemberId} chatMsgEvent intro_ postDeliver = do
|
||||
sendGroupMemberMessage :: MsgEncodingI e => GroupInfo -> GroupMember -> ChatMsgEvent e -> CM ()
|
||||
sendGroupMemberMessage gInfo@GroupInfo {groupId} m@GroupMember {groupMemberId} chatMsgEvent = do
|
||||
msg <- createSndMessage chatMsgEvent (GroupId groupId)
|
||||
messageMember msg `catchAllErrors` eToView
|
||||
where
|
||||
messageMember :: SndMessage -> CM ()
|
||||
messageMember SndMessage {msgId, msgBody} = forM_ (memberSendAction gInfo (chatMsgEvent :| []) [m] m) $ \case
|
||||
MSASend conn -> deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId >> postDeliver
|
||||
MSASendBatched conn -> deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId >> postDeliver
|
||||
MSAPending -> withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId (introId <$> intro_)
|
||||
MSASend conn -> void $ deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId
|
||||
MSASendBatched conn -> void $ deliverMessage conn (toCMEventTag chatMsgEvent) msgBody msgId
|
||||
MSAPending -> withStore' $ \db -> createPendingGroupMessage db groupMemberId msgId
|
||||
MSAForwarded -> pure ()
|
||||
|
||||
-- TODO ensure order - pending messages interleave with user input messages
|
||||
sendPendingGroupMessages :: User -> GroupMember -> Connection -> CM ()
|
||||
sendPendingGroupMessages user GroupMember {groupMemberId} conn = do
|
||||
pgms <- withStore' $ \db -> getPendingGroupMessages db groupMemberId
|
||||
forM_ (L.nonEmpty pgms) $ \pgms' -> do
|
||||
let msgs = L.map (\(sndMsg, _, _) -> sndMsg) pgms'
|
||||
void $ batchSendConnMessages user conn MsgFlags {notification = True} msgs
|
||||
lift . void . withStoreBatch' $ \db -> L.map (\SndMessage {msgId} -> deletePendingGroupMessage db groupMemberId msgId) msgs
|
||||
lift . void . withStoreBatch' $ \db -> L.map (\(_, tag, introId_) -> updateIntro_ db tag introId_) pgms'
|
||||
where
|
||||
updateIntro_ :: DB.Connection -> ACMEventTag -> Maybe Int64 -> IO ()
|
||||
updateIntro_ db tag introId_ = case (tag, introId_) of
|
||||
(ACMEventTag _ XGrpMemFwd_, Just introId) -> updateIntroStatus db introId GMIntroInvForwarded
|
||||
_ -> pure ()
|
||||
msgs <- withStore' $ \db -> getPendingGroupMessages db groupMemberId
|
||||
forM_ (L.nonEmpty msgs) $ \msgs' -> do
|
||||
void $ batchSendConnMessages user conn MsgFlags {notification = True} msgs'
|
||||
lift . void . withStoreBatch' $ \db -> L.map (\SndMessage {msgId} -> deletePendingGroupMessage db groupMemberId msgId) msgs'
|
||||
|
||||
saveDirectRcvMSG :: MsgEncodingI e => Connection -> MsgMeta -> MsgBody -> ChatMessage e -> CM (Connection, RcvMessage)
|
||||
saveDirectRcvMSG conn@Connection {connId} agentMsgMeta msgBody ChatMessage {chatVRange, msgId = sharedMsgId_, chatMsgEvent} = do
|
||||
|
||||
@@ -2615,14 +2615,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
GCInviteeMember ->
|
||||
withStore' (\db -> runExceptT $ getGroupMemberByMemberId db vr user gInfo memId) >>= \case
|
||||
Left _ -> messageError "x.grp.mem.inv error: referenced member does not exist"
|
||||
Right reMember -> do
|
||||
intro_ <- withStore' $ \db -> getIntroduction db reMember m
|
||||
update intro_ GMIntroInvReceived
|
||||
sendGroupMemberMessage gInfo reMember (XGrpMemFwd (memberInfo gInfo m) introInv) intro_ $
|
||||
update intro_ GMIntroInvForwarded
|
||||
where
|
||||
update (Just GroupMemberIntro {introId}) status = withStore' $ \db -> updateIntroStatus db introId status
|
||||
update Nothing _ = pure ()
|
||||
Right reMember -> sendGroupMemberMessage gInfo reMember $ XGrpMemFwd (memberInfo gInfo m) introInv
|
||||
_ -> messageError "x.grp.mem.inv can be only sent by invitee member"
|
||||
|
||||
xGrpMemFwd :: GroupInfo -> GroupMember -> MemberInfo -> IntroInvitation -> CM ()
|
||||
@@ -2718,8 +2711,6 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
xGrpMemCon :: GroupInfo -> GroupMember -> MemberId -> CM ()
|
||||
xGrpMemCon gInfo sendingMem memId = do
|
||||
refMem <- withStore $ \db -> getGroupMemberByMemberId db vr user gInfo memId
|
||||
withStore' (`migrateMemberRelationsVector` sendingMem)
|
||||
withStore' (`migrateMemberRelationsVector` refMem)
|
||||
-- Updating vectors in separate transactions to avoid deadlocks.
|
||||
withStore $ \db -> setMemberVectorRelationConnected db sendingMem refMem MRSubjectConnected
|
||||
withStore $ \db -> setMemberVectorRelationConnected db refMem sendingMem MRReferencedConnected
|
||||
@@ -2783,7 +2774,7 @@ processAgentMessageConn vr user@User {userId} corrId agentConnId agentMessage =
|
||||
let GroupMember {memberId} = m
|
||||
memberName = Just $ memberShortenedName m
|
||||
event = XGrpMsgForward memberId memberName chatMsg brokerTs
|
||||
sendGroupMemberMessage gInfo member event Nothing (pure ())
|
||||
sendGroupMemberMessage gInfo member event
|
||||
|
||||
-- TODO [channels fwd] base on differentiation between groups and channels
|
||||
isUserGrpFwdRelay :: GroupInfo -> Bool
|
||||
@@ -3228,7 +3219,7 @@ runDeliveryJobWorker a deliveryKey Worker {doWork} = do
|
||||
unless (null ms) $ deliver body ms
|
||||
where
|
||||
buildMemberList sender = do
|
||||
vec <- withStore $ \db -> migrateGetMemberRelationsVector db sender
|
||||
vec <- withStore (`getMemberRelationsVector` sender)
|
||||
-- this excludes the sender
|
||||
let introducedMemsIdxs = getRelationsIndexes MRIntroduced vec
|
||||
case jobScope of
|
||||
|
||||
@@ -42,7 +42,7 @@ chatDbOptsP _appDir defaultDbName = do
|
||||
( long "pool-size"
|
||||
<> metavar "DB_POOL_SIZE"
|
||||
<> help "Database connection pool size"
|
||||
<> value 10
|
||||
<> value 1
|
||||
<> showDefault
|
||||
)
|
||||
dbCreateSchema <-
|
||||
@@ -84,7 +84,7 @@ mobileDbOpts schemaPrefix connstr = do
|
||||
ChatDbOpts
|
||||
{ dbConnstr,
|
||||
dbSchemaPrefix,
|
||||
dbPoolSize = 10,
|
||||
dbPoolSize = 1,
|
||||
dbCreateSchema = True
|
||||
}
|
||||
|
||||
|
||||
+31
-14
@@ -165,7 +165,8 @@ startRemoteHost rh_ rcAddrPrefs_ port_ = do
|
||||
where
|
||||
mkCtrlAppInfo = do
|
||||
deviceName <- chatReadVar localDeviceName
|
||||
pure CtrlAppInfo {appVersionRange = ctrlAppVersionRange, deviceName}
|
||||
useCompression <- asks $ remoteCompression . config
|
||||
pure CtrlAppInfo {appVersionRange = ctrlAppVersionRange, deviceName, compression = BoolDef useCompression}
|
||||
parseHostAppInfo :: RCHostHello -> ExceptT RemoteHostError IO HostAppInfo
|
||||
parseHostAppInfo RCHostHello {app = hostAppInfo} = do
|
||||
hostInfo@HostAppInfo {appVersion, encoding} <-
|
||||
@@ -213,7 +214,9 @@ startRemoteHost rh_ rcAddrPrefs_ port_ = do
|
||||
RHSessionConfirmed _ RHPendingSession {rchClient} -> Right ((), RHSessionConnected {rchClient, tls, rhClient, pollAction, storePath})
|
||||
_ -> Left $ ChatErrorRemoteHost rhKey RHEBadState
|
||||
chatWriteVar currentRemoteHost $ Just remoteHostId -- this is required for commands to be passed to remote host
|
||||
toView $ CEvtRemoteHostConnected rhi {sessionState = Just RHSConnected {sessionCode}}
|
||||
let RemoteHostClient {encryption = RemoteCrypto {compression}} = rhClient
|
||||
remoteHost = rhi {sessionState = Just RHSConnected {sessionCode}} :: RemoteHostInfo
|
||||
toView $ CEvtRemoteHostConnected {remoteHost, compression}
|
||||
upsertRemoteHost :: RCHostPairing -> Maybe RemoteHostInfo -> Maybe RCCtrlAddress -> Text -> SessionSeq -> RemoteHostSessionState -> CM RemoteHostInfo
|
||||
upsertRemoteHost pairing'@RCHostPairing {knownHost = kh_} rhi_ rcAddr_ hostDeviceName sseq state = do
|
||||
KnownHostPairing {hostDhPubKey = hostDhPubKey'} <- maybe (throwError . ChatError $ CEInternalError "KnownHost is known after verification") pure kh_
|
||||
@@ -459,7 +462,7 @@ startRemoteCtrlSession = do
|
||||
|
||||
connectRemoteCtrl :: RCVerifiedInvitation -> SessionSeq -> CM (Maybe RemoteCtrlInfo, CtrlAppInfo)
|
||||
connectRemoteCtrl verifiedInv@(RCVerifiedInvitation inv@RCInvitation {ca, app}) sseq = handleCtrlError sseq RCSRConnectionFailed "connectRemoteCtrl" $ do
|
||||
ctrlInfo@CtrlAppInfo {deviceName = ctrlDeviceName} <- parseCtrlAppInfo app
|
||||
ctrlInfo <- parseCtrlAppInfo app
|
||||
v <- checkAppVersion ctrlInfo
|
||||
rc_ <- withStore' $ \db -> getRemoteCtrlByFingerprint db ca
|
||||
mapM_ (validateRemoteCtrl inv) rc_
|
||||
@@ -469,23 +472,23 @@ connectRemoteCtrl verifiedInv@(RCVerifiedInvitation inv@RCInvitation {ca, app})
|
||||
cmdOk <- newEmptyTMVarIO
|
||||
rcsWaitSession <- async $ do
|
||||
atomically $ takeTMVar cmdOk
|
||||
handleCtrlError sseq RCSRConnectionFailed "waitForCtrlSession" $ waitForCtrlSession rc_ ctrlDeviceName rcsClient vars
|
||||
handleCtrlError sseq RCSRConnectionFailed "waitForCtrlSession" $ waitForCtrlSession rc_ ctrlInfo rcsClient vars
|
||||
updateRemoteCtrlSession sseq $ \case
|
||||
RCSessionStarting -> Right RCSessionConnecting {remoteCtrlId_ = remoteCtrlId' <$> rc_, rcsClient, rcsWaitSession}
|
||||
RCSessionStarting -> Right RCSessionConnecting {remoteCtrlId_ = remoteCtrlId' <$> rc_, rcsClient, rcsWaitSession, ctrlAppInfo = ctrlInfo}
|
||||
_ -> Left $ ChatErrorRemoteCtrl RCEBadState
|
||||
atomically $ putTMVar cmdOk ()
|
||||
pure ((`remoteCtrlInfo` Just RCSConnecting) <$> rc_, ctrlInfo)
|
||||
where
|
||||
validateRemoteCtrl RCInvitation {idkey} RemoteCtrl {ctrlPairing = RCCtrlPairing {idPubKey}} =
|
||||
unless (idkey == idPubKey) $ throwError $ ChatErrorRemoteCtrl $ RCEProtocolError $ PRERemoteControl RCEIdentity
|
||||
waitForCtrlSession :: Maybe RemoteCtrl -> Text -> RCCtrlClient -> RCStepTMVar (ByteString, TLS 'TClient, RCStepTMVar (RCCtrlSession, RCCtrlPairing)) -> CM ()
|
||||
waitForCtrlSession rc_ ctrlName rcsClient vars = do
|
||||
waitForCtrlSession :: Maybe RemoteCtrl -> CtrlAppInfo -> RCCtrlClient -> RCStepTMVar (ByteString, TLS 'TClient, RCStepTMVar (RCCtrlSession, RCCtrlPairing)) -> CM ()
|
||||
waitForCtrlSession rc_ ctrlAppInfo@CtrlAppInfo {deviceName = ctrlName} rcsClient vars = do
|
||||
(uniq, tls, rcsWaitConfirmation) <- timeoutThrow (ChatErrorRemoteCtrl RCETimeout) networkIOTimeout $ takeRCStep vars
|
||||
let sessionCode = verificationCode uniq
|
||||
updateRemoteCtrlSession sseq $ \case
|
||||
RCSessionConnecting {rcsWaitSession} ->
|
||||
let remoteCtrlId_ = remoteCtrlId' <$> rc_
|
||||
in Right RCSessionPendingConfirmation {remoteCtrlId_, ctrlDeviceName = ctrlName, rcsClient, tls, sessionCode, rcsWaitSession, rcsWaitConfirmation}
|
||||
in Right RCSessionPendingConfirmation {remoteCtrlId_, ctrlDeviceName = ctrlName, rcsClient, tls, sessionCode, rcsWaitSession, rcsWaitConfirmation, ctrlAppInfo}
|
||||
_ -> Left $ ChatErrorRemoteCtrl RCEBadState
|
||||
toView CEvtRemoteCtrlSessionCode {remoteCtrl_ = (`remoteCtrlInfo` Just RCSPendingConfirmation {sessionCode}) <$> rc_, sessionCode}
|
||||
checkAppVersion CtrlAppInfo {appVersionRange} =
|
||||
@@ -495,7 +498,8 @@ connectRemoteCtrl verifiedInv@(RCVerifiedInvitation inv@RCInvitation {ca, app})
|
||||
getHostAppInfo appVersion = do
|
||||
hostDeviceName <- chatReadVar localDeviceName
|
||||
encryptFiles <- chatReadVar encryptLocalFiles
|
||||
pure HostAppInfo {appVersion, deviceName = hostDeviceName, encoding = localEncoding, encryptFiles}
|
||||
useCompression <- asks $ remoteCompression . config
|
||||
pure HostAppInfo {appVersion, deviceName = hostDeviceName, encoding = localEncoding, encryptFiles, compression = BoolDef useCompression}
|
||||
|
||||
parseCtrlAppInfo :: JT.Value -> CM CtrlAppInfo
|
||||
parseCtrlAppInfo ctrlAppInfo = do
|
||||
@@ -514,7 +518,7 @@ handleRemoteCommand execCC encryption remoteOutputQ HTTP2Request {request, reqBo
|
||||
parseRequest :: ExceptT RemoteProtocolError IO (C.SbKeyNonce, GetChunk, RemoteCommand)
|
||||
parseRequest = do
|
||||
(rfKN, header, getNext) <- parseDecryptHTTP2Body encryption request reqBody
|
||||
(rfKN,getNext,) <$> liftEitherWith RPEInvalidJSON (J.eitherDecode header)
|
||||
(rfKN,getNext,) <$> liftEitherWith RPEInvalidJSON (J.eitherDecodeStrict header)
|
||||
replyError = reply . RRChatResponse . RRError
|
||||
processCommand :: User -> C.SbKeyNonce -> GetChunk -> RemoteCommand -> CM ()
|
||||
processCommand user rfKN getNext = \case
|
||||
@@ -611,7 +615,7 @@ remoteCtrlInfo RemoteCtrl {remoteCtrlId, ctrlDeviceName} sessionState =
|
||||
RemoteCtrlInfo {remoteCtrlId, ctrlDeviceName, sessionState}
|
||||
|
||||
-- | Take a look at emoji of tlsunique, commit pairing, and start session server
|
||||
verifyRemoteCtrlSession :: (ByteString -> Int -> CM' (Either ChatError ChatResponse)) -> Text -> CM RemoteCtrlInfo
|
||||
verifyRemoteCtrlSession :: (ByteString -> Int -> CM' (Either ChatError ChatResponse)) -> Text -> CM ChatResponse
|
||||
verifyRemoteCtrlSession execCC sessCode' = do
|
||||
(sseq, client, ctrlName, sessionCode, vars) <-
|
||||
chatReadVar remoteCtrlSession >>= \case
|
||||
@@ -625,14 +629,15 @@ verifyRemoteCtrlSession execCC sessCode' = do
|
||||
(rcsSession@RCCtrlSession {tls, sessionKeys}, rcCtrlPairing) <- timeoutThrow (ChatErrorRemoteCtrl RCETimeout) networkIOTimeout $ takeRCStep vars
|
||||
rc@RemoteCtrl {remoteCtrlId} <- upsertRemoteCtrl ctrlName rcCtrlPairing
|
||||
remoteOutputQ <- asks (tbqSize . config) >>= newTBQueueIO
|
||||
encryption <- mkCtrlRemoteCrypto sessionKeys $ tlsUniq tls
|
||||
encryption@RemoteCrypto {compression} <- mkCtrlRemoteCrypto sessionKeys (tlsUniq tls) =<< getRemoteCtrlAppInfo sseq
|
||||
cc <- ask
|
||||
http2Server <- liftIO . async $ attachHTTP2Server tls $ \req -> handleRemoteCommand execCC encryption remoteOutputQ req `runReaderT` cc
|
||||
void . forkIO $ monitor sseq http2Server
|
||||
updateRemoteCtrlSession sseq $ \case
|
||||
RCSessionPendingConfirmation {} -> Right RCSessionConnected {remoteCtrlId, rcsClient = client, rcsSession, tls, http2Server, remoteOutputQ}
|
||||
RCSessionPendingConfirmation {ctrlAppInfo} -> Right RCSessionConnected {remoteCtrlId, rcsClient = client, rcsSession, tls, http2Server, remoteOutputQ, ctrlAppInfo}
|
||||
_ -> Left $ ChatErrorRemoteCtrl RCEBadState
|
||||
pure $ remoteCtrlInfo rc $ Just RCSConnected {sessionCode = tlsSessionCode tls}
|
||||
let remoteCtrl = remoteCtrlInfo rc $ Just RCSConnected {sessionCode = tlsSessionCode tls}
|
||||
pure CRRemoteCtrlConnected {remoteCtrl, compression}
|
||||
where
|
||||
upsertRemoteCtrl :: Text -> RCCtrlPairing -> CM RemoteCtrl
|
||||
upsertRemoteCtrl ctrlName rcCtrlPairing = withStore $ \db -> do
|
||||
@@ -717,6 +722,18 @@ updateRemoteCtrlSession sseq state = do
|
||||
Right st' -> Right () <$ writeTVar session (Just (sseq, st'))
|
||||
liftEither r
|
||||
|
||||
getRemoteCtrlAppInfo :: SessionSeq -> CM (Maybe CtrlAppInfo)
|
||||
getRemoteCtrlAppInfo sseq = chatReadVar remoteCtrlSession $>>= pure . appInfo
|
||||
where
|
||||
appInfo (currSseq, sess)
|
||||
| sseq == currSseq = case sess of
|
||||
RCSessionStarting -> Nothing
|
||||
RCSessionSearching {} -> Nothing
|
||||
RCSessionConnecting {ctrlAppInfo} -> Just ctrlAppInfo
|
||||
RCSessionPendingConfirmation {ctrlAppInfo} -> Just ctrlAppInfo
|
||||
RCSessionConnected {ctrlAppInfo} -> Just ctrlAppInfo
|
||||
| otherwise = Nothing
|
||||
|
||||
utf8String :: [Char] -> ByteString
|
||||
utf8String = encodeUtf8 . T.pack
|
||||
{-# INLINE utf8String #-}
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
|
||||
module Simplex.Chat.Remote.Protocol where
|
||||
|
||||
import qualified Codec.Compression.Zstd as Z1
|
||||
import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.Reader
|
||||
@@ -27,6 +28,7 @@ import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString as B
|
||||
import Data.ByteString.Builder (Builder, byteString, lazyByteString)
|
||||
import qualified Data.ByteString.Lazy as LB
|
||||
import qualified Data.ByteString.Lazy.Internal as LB
|
||||
import Data.String (fromString)
|
||||
import Data.Text (Text)
|
||||
import Data.Text.Encoding (decodeUtf8)
|
||||
@@ -37,6 +39,7 @@ import Network.Transport.Internal (decodeWord32, encodeWord32)
|
||||
import Simplex.Chat.Controller
|
||||
import Simplex.Chat.Remote.Transport
|
||||
import Simplex.Chat.Remote.Types
|
||||
import Simplex.Chat.Types (BoolDef (..))
|
||||
import Simplex.FileTransfer.Description (FileDigest (..))
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.File (CryptoFile (..))
|
||||
@@ -102,10 +105,10 @@ $(JQ.deriveJSON (taggedObjectJSON $ dropPrefix "RR") ''RemoteResponse)
|
||||
-- * Client side / desktop
|
||||
|
||||
mkRemoteHostClient :: HTTP2Client -> HostSessKeys -> SessionCode -> FilePath -> HostAppInfo -> CM RemoteHostClient
|
||||
mkRemoteHostClient httpClient sessionKeys sessionCode storePath HostAppInfo {encoding, deviceName, encryptFiles} = do
|
||||
mkRemoteHostClient httpClient sessionKeys sessionCode storePath HostAppInfo {encoding, deviceName, encryptFiles, compression} = do
|
||||
let HostSessKeys {chainKeys, idPrivKey, sessPrivKey} = sessionKeys
|
||||
signatures = RSSign {idPrivKey, sessPrivKey}
|
||||
encryption <- liftIO $ mkRemoteCrypto sessionCode chainKeys signatures
|
||||
encryption <- mkRemoteCrypto sessionCode chainKeys signatures $ isTrue compression
|
||||
pure
|
||||
RemoteHostClient
|
||||
{ hostEncoding = encoding,
|
||||
@@ -116,17 +119,19 @@ mkRemoteHostClient httpClient sessionKeys sessionCode storePath HostAppInfo {enc
|
||||
storePath
|
||||
}
|
||||
|
||||
mkCtrlRemoteCrypto :: CtrlSessKeys -> SessionCode -> CM RemoteCrypto
|
||||
mkCtrlRemoteCrypto CtrlSessKeys {chainKeys, idPubKey, sessPubKey} sessionCode =
|
||||
mkCtrlRemoteCrypto :: CtrlSessKeys -> SessionCode -> Maybe CtrlAppInfo -> CM RemoteCrypto
|
||||
mkCtrlRemoteCrypto CtrlSessKeys {chainKeys, idPubKey, sessPubKey} sessionCode ctrlAppInfo_ = do
|
||||
let signatures = RSVerify {idPubKey, sessPubKey}
|
||||
in liftIO $ mkRemoteCrypto sessionCode chainKeys signatures
|
||||
peerCompression = maybe False (\CtrlAppInfo {compression} -> isTrue compression) ctrlAppInfo_
|
||||
mkRemoteCrypto sessionCode chainKeys signatures peerCompression
|
||||
|
||||
mkRemoteCrypto :: SessionCode -> TSbChainKeys -> RemoteSignatures -> IO RemoteCrypto
|
||||
mkRemoteCrypto sessionCode chainKeys signatures = do
|
||||
mkRemoteCrypto :: SessionCode -> TSbChainKeys -> RemoteSignatures -> Bool -> CM RemoteCrypto
|
||||
mkRemoteCrypto sessionCode chainKeys signatures peerCompression = do
|
||||
sndCounter <- newTVarIO 0
|
||||
rcvCounter <- newTVarIO 0
|
||||
skippedKeys <- liftIO TM.emptyIO
|
||||
pure RemoteCrypto {sessionCode, sndCounter, rcvCounter, chainKeys, skippedKeys, signatures}
|
||||
useCompression <- asks $ remoteCompression . config
|
||||
pure RemoteCrypto {sessionCode, sndCounter, rcvCounter, chainKeys, skippedKeys, signatures, compression = peerCompression && useCompression}
|
||||
|
||||
closeRemoteHostClient :: RemoteHostClient -> IO ()
|
||||
closeRemoteHostClient RemoteHostClient {httpClient} = closeHTTP2Client httpClient
|
||||
@@ -176,7 +181,7 @@ sendRemoteCommand RemoteHostClient {httpClient, hostEncoding, encryption} file_
|
||||
let req = httpRequest encFile_ encCmd
|
||||
HTTP2Response {response, respBody} <- liftError' (RPEHTTP2 . tshow) $ sendRequestDirect httpClient req Nothing
|
||||
(rfKN, header, getNext) <- parseDecryptHTTP2Body encryption response respBody
|
||||
rr <- liftEitherWith (RPEInvalidJSON . fromString) $ J.eitherDecode header >>= JT.parseEither J.parseJSON . convertJSON hostEncoding localEncoding
|
||||
rr <- liftEitherWith (RPEInvalidJSON . fromString) $ J.eitherDecodeStrict header >>= JT.parseEither J.parseJSON . convertJSON hostEncoding localEncoding
|
||||
pure (rfKN, getNext, rr)
|
||||
where
|
||||
httpRequest encFile_ cmdBld = H.requestStreaming N.methodPost "/" mempty $ \send flush -> do
|
||||
@@ -247,8 +252,11 @@ pattern OwsfTag = (SingleFieldJSONTag, J.Bool True)
|
||||
-- See https://github.com/simplex-chat/simplexmq/blob/master/rfcs/2023-10-25-remote-control.md for encoding
|
||||
|
||||
encryptEncodeHTTP2Body :: Word32 -> C.SbKeyNonce -> RemoteCrypto -> LazyByteString -> ExceptT RemoteProtocolError IO Builder
|
||||
encryptEncodeHTTP2Body corrId cmdKN RemoteCrypto {sessionCode, signatures} s = do
|
||||
ct <- liftError PRERemoteControl $ RC.rcEncryptBody cmdKN $ LB.fromStrict (smpEncode sessionCode) <> s
|
||||
encryptEncodeHTTP2Body corrId cmdKN RemoteCrypto {sessionCode, signatures, compression} s = do
|
||||
let s'
|
||||
| compression = LB.fromStrict $ Z1.compress 3 $ LB.toStrict s
|
||||
| otherwise = s
|
||||
ct <- liftError PRERemoteControl $ RC.rcEncryptBody cmdKN $ LB.Chunk (smpEncode sessionCode) s'
|
||||
let ctLen = encodeWord32 (fromIntegral $ LB.length ct)
|
||||
signed = LB.fromStrict (encodeWord32 corrId <> ctLen) <> ct
|
||||
sigs <- bodySignatures signed
|
||||
@@ -266,12 +274,12 @@ encryptEncodeHTTP2Body corrId cmdKN RemoteCrypto {sessionCode, signatures} s = d
|
||||
sign k = C.signatureBytes . C.sign' k . BA.convert . CH.hashFinalize
|
||||
|
||||
-- | Parse and decrypt HTTP2 request/response
|
||||
parseDecryptHTTP2Body :: HTTP2BodyChunk a => RemoteCrypto -> a -> HTTP2Body -> ExceptT RemoteProtocolError IO (C.SbKeyNonce, LazyByteString, Int -> IO ByteString)
|
||||
parseDecryptHTTP2Body rc@RemoteCrypto {sessionCode, signatures} hr HTTP2Body {bodyBuffer} = do
|
||||
parseDecryptHTTP2Body :: HTTP2BodyChunk a => RemoteCrypto -> a -> HTTP2Body -> ExceptT RemoteProtocolError IO (C.SbKeyNonce, ByteString, Int -> IO ByteString)
|
||||
parseDecryptHTTP2Body rc@RemoteCrypto {sessionCode, signatures, compression} hr HTTP2Body {bodyBuffer} = do
|
||||
(corrId, ct) <- getBody
|
||||
(cmdKN, rfKN) <- ExceptT $ atomically $ getRemoteRcvKeys rc corrId
|
||||
s <- liftError PRERemoteControl $ RC.rcDecryptBody cmdKN ct
|
||||
s' <- parseBody s
|
||||
s' <- decompress =<< parseBody s
|
||||
pure (rfKN, s', getNext)
|
||||
where
|
||||
getBody :: ExceptT RemoteProtocolError IO (Word32, LazyByteString)
|
||||
@@ -320,3 +328,10 @@ parseDecryptHTTP2Body rc@RemoteCrypto {sessionCode, signatures} hr HTTP2Body {bo
|
||||
unless (LB.length bs == n) $ throwError PRESessionCode
|
||||
pure (LB.toStrict bs, rest)
|
||||
getNext sz = getBuffered bodyBuffer sz Nothing $ getBodyChunk hr
|
||||
decompress :: LazyByteString -> ExceptT RemoteProtocolError IO ByteString
|
||||
decompress s
|
||||
| compression = case Z1.decompress $ LB.toStrict s of
|
||||
Z1.Error e -> throwError $ RPEInvalidBody e
|
||||
Z1.Skip -> pure B.empty
|
||||
Z1.Decompress s' -> pure s'
|
||||
| otherwise = pure $ LB.toStrict s
|
||||
|
||||
@@ -21,7 +21,7 @@ import Data.Int (Int64)
|
||||
import Data.Text (Text)
|
||||
import Data.Word (Word16, Word32)
|
||||
import Simplex.Chat.Remote.AppVersion
|
||||
import Simplex.Chat.Types (verificationCode)
|
||||
import Simplex.Chat.Types (BoolDef, verificationCode)
|
||||
import qualified Simplex.Messaging.Crypto as C
|
||||
import Simplex.Messaging.Crypto.File (CryptoFile)
|
||||
import Simplex.Messaging.Parsers (defaultJSON, dropPrefix, enumJSON, sumTypeJSON)
|
||||
@@ -47,7 +47,8 @@ data RemoteCrypto = RemoteCrypto
|
||||
rcvCounter :: TVar Word32,
|
||||
chainKeys :: TSbChainKeys,
|
||||
skippedKeys :: TM.TMap Word32 (C.SbKeyNonce, C.SbKeyNonce),
|
||||
signatures :: RemoteSignatures
|
||||
signatures :: RemoteSignatures,
|
||||
compression :: Bool
|
||||
}
|
||||
|
||||
getRemoteSndKeys :: RemoteCrypto -> STM (Word32, C.SbKeyNonce, C.SbKeyNonce)
|
||||
@@ -220,7 +221,8 @@ data RemoteFile = RemoteFile
|
||||
|
||||
data CtrlAppInfo = CtrlAppInfo
|
||||
{ appVersionRange :: AppVersionRange,
|
||||
deviceName :: Text
|
||||
deviceName :: Text,
|
||||
compression :: BoolDef
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
@@ -228,7 +230,8 @@ data HostAppInfo = HostAppInfo
|
||||
{ appVersion :: AppVersion,
|
||||
deviceName :: Text,
|
||||
encoding :: PlatformEncoding,
|
||||
encryptFiles :: Bool -- if the host encrypts files in app storage
|
||||
encryptFiles :: Bool, -- if the host encrypts files in app storage
|
||||
compression :: BoolDef
|
||||
}
|
||||
|
||||
$(J.deriveJSON defaultJSON ''RemoteFile)
|
||||
|
||||
@@ -99,18 +99,10 @@ module Simplex.Chat.Store.Groups
|
||||
deleteGroupMember,
|
||||
deleteGroupMemberConnection,
|
||||
updateGroupMemberRole,
|
||||
createIntroductions,
|
||||
createIntrosOrUpdateVectors,
|
||||
setMemberVectorNewRelations,
|
||||
setMembersVectorsNewRelation,
|
||||
setMemberVectorRelationConnected,
|
||||
migrateGetMemberRelationsVector,
|
||||
migrateMemberRelationsVector,
|
||||
migrateMemberRelationsVector',
|
||||
getMemberRelationsVector_,
|
||||
updateIntroStatus,
|
||||
getIntroduction,
|
||||
getIntroducedGroupMemberIds,
|
||||
getMemberRelationsVector,
|
||||
createIntroReMember,
|
||||
createIntroToMemberContact,
|
||||
getMatchingContacts,
|
||||
@@ -151,8 +143,6 @@ module Simplex.Chat.Store.Groups
|
||||
setGroupChatTTL,
|
||||
getGroupChatTTL,
|
||||
getUserGroupsToExpire,
|
||||
hasMembersWithoutVector,
|
||||
getGMsWithoutVectorIds,
|
||||
updateGroupAlias,
|
||||
)
|
||||
where
|
||||
@@ -166,7 +156,6 @@ import Data.ByteString (ByteString)
|
||||
import qualified Data.ByteString as B
|
||||
import Data.Char (toLower)
|
||||
import Data.Either (rights)
|
||||
import Data.Foldable (foldrM)
|
||||
import Data.Int (Int64)
|
||||
import Data.List (partition, sortOn)
|
||||
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing)
|
||||
@@ -1609,75 +1598,6 @@ updateGroupMemberRole :: DB.Connection -> User -> GroupMember -> GroupMemberRole
|
||||
updateGroupMemberRole db User {userId} GroupMember {groupMemberId} memRole =
|
||||
DB.execute db "UPDATE group_members SET member_role = ? WHERE user_id = ? AND group_member_id = ?" (memRole, userId, groupMemberId)
|
||||
|
||||
createIntroductions :: DB.Connection -> VersionChat -> [GroupMember] -> GroupMember -> IO [GroupMember]
|
||||
createIntroductions db chatV reMembers toMember
|
||||
| null reMembers = pure []
|
||||
| otherwise = do
|
||||
currentTs <- getCurrentTime
|
||||
catMaybes <$> mapM (createIntro_ currentTs) reMembers
|
||||
where
|
||||
createIntro_ :: UTCTime -> GroupMember -> IO (Maybe GroupMember)
|
||||
createIntro_ ts reMember =
|
||||
-- when members connect concurrently, host would try to create introductions between them in both directions;
|
||||
-- this check avoids creating second (redundant) introduction
|
||||
checkInverseIntro >>= \case
|
||||
Just _ -> pure Nothing
|
||||
Nothing -> do
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
INSERT INTO group_member_intros
|
||||
(re_group_member_id, to_group_member_id, intro_status, intro_chat_protocol_version, created_at, updated_at)
|
||||
VALUES (?,?,?,?,?,?)
|
||||
|]
|
||||
(groupMemberId' reMember, groupMemberId' toMember, GMIntroPending, chatV, ts, ts)
|
||||
pure $ Just reMember
|
||||
where
|
||||
checkInverseIntro :: IO (Maybe Int64)
|
||||
checkInverseIntro =
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
"SELECT 1 FROM group_member_intros WHERE re_group_member_id = ? AND to_group_member_id = ? LIMIT 1"
|
||||
(groupMemberId' toMember, groupMemberId' reMember)
|
||||
|
||||
-- Create introductions for members without vectors and update vectors for members with vectors.
|
||||
-- Partitioning and updates happen in same transaction to avoid race conditions.
|
||||
createIntrosOrUpdateVectors :: DB.Connection -> VersionRangeChat -> [GroupMember] -> GroupMember -> IO [GroupMember]
|
||||
createIntrosOrUpdateVectors db vr reMembers toMember
|
||||
| null reMembers = pure []
|
||||
| otherwise = do
|
||||
(memsWithVec, memsWithoutVec) <- partitionByVector reMembers
|
||||
let GroupMember {indexInGroup} = toMember
|
||||
setMembersVectorsNewRelation db memsWithVec indexInGroup IDSubjectIntroduced MRIntroduced
|
||||
memsWithoutVec' <- createIntroductions db (maxVersion vr) memsWithoutVec toMember
|
||||
pure $ memsWithoutVec' <> memsWithVec
|
||||
where
|
||||
partitionByVector :: [GroupMember] -> IO ([GroupMember], [GroupMember])
|
||||
#if defined(dbPostgres)
|
||||
partitionByVector members = do
|
||||
let memberIds = map groupMemberId' members
|
||||
-- Lock rows first to ensure partitioning doesn't change in case of concurrent updates
|
||||
_ :: [Only Int] <-
|
||||
DB.query
|
||||
db
|
||||
"SELECT 1 FROM group_members WHERE group_member_id IN ? FOR UPDATE"
|
||||
(Only $ In memberIds)
|
||||
memberIdsWithVec <- S.fromList . map fromOnly <$>
|
||||
DB.query
|
||||
db
|
||||
"SELECT group_member_id FROM group_members WHERE group_member_id IN ? AND member_relations_vector IS NOT NULL"
|
||||
(Only $ In memberIds)
|
||||
pure $ partition (\m -> groupMemberId' m `S.member` memberIdsWithVec) members
|
||||
#else
|
||||
partitionByVector = foldrM checkMember ([], [])
|
||||
where
|
||||
checkMember m (withVec, withoutVec) = do
|
||||
hasVec <- isJust <$> maybeFirstRow fromOnly
|
||||
(DB.query db "SELECT 1 FROM group_members WHERE group_member_id = ? AND member_relations_vector IS NOT NULL" (Only $ groupMemberId' m) :: IO [Only Int64])
|
||||
pure $ if hasVec then (m : withVec, withoutVec) else (withVec, m : withoutVec)
|
||||
#endif
|
||||
|
||||
setMemberVectorNewRelations :: DB.Connection -> GroupMember -> [(Int64, (IntroductionDirection, MemberRelation))] -> IO ()
|
||||
setMemberVectorNewRelations db GroupMember {groupMemberId} relations = do
|
||||
v_ <- maybeFirstRow fromOnly $
|
||||
@@ -1742,100 +1662,14 @@ setMemberVectorRelationConnected db GroupMember {groupMemberId} GroupMember {ind
|
||||
|]
|
||||
(Binary v', currentTs, groupMemberId)
|
||||
|
||||
migrateGetMemberRelationsVector :: DB.Connection -> GroupMember -> ExceptT StoreError IO ByteString
|
||||
migrateGetMemberRelationsVector db m@GroupMember {groupMemberId} = do
|
||||
liftIO $ migrateMemberRelationsVector db m
|
||||
getMemberRelationsVector :: DB.Connection -> GroupMember -> ExceptT StoreError IO ByteString
|
||||
getMemberRelationsVector db GroupMember {groupMemberId} =
|
||||
ExceptT . firstRow fromOnly (SEGroupMemberNotFound groupMemberId) $
|
||||
DB.query
|
||||
db
|
||||
"SELECT member_relations_vector FROM group_members WHERE group_member_id = ?"
|
||||
(Only groupMemberId)
|
||||
|
||||
migrateMemberRelationsVector :: DB.Connection -> GroupMember -> IO ()
|
||||
migrateMemberRelationsVector db GroupMember {groupMemberId} =
|
||||
migrateMemberRelationsVector' db groupMemberId
|
||||
|
||||
migrateMemberRelationsVector' :: DB.Connection -> GroupMemberId -> IO ()
|
||||
migrateMemberRelationsVector' db groupMemberId = do
|
||||
currentTs <- liftIO getCurrentTime
|
||||
liftIO $ do
|
||||
#if defined(dbPostgres)
|
||||
-- Lock the row first to ensure computation runs only after lock is acquired
|
||||
_ :: [Only Int] <-
|
||||
DB.query
|
||||
db
|
||||
"SELECT 1 FROM group_members WHERE group_member_id = ? AND member_relations_vector IS NULL FOR UPDATE"
|
||||
(Only groupMemberId)
|
||||
#endif
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
UPDATE group_members
|
||||
SET
|
||||
member_relations_vector = (
|
||||
SELECT migrate_relations_vector(idx, direction, intro_status)
|
||||
FROM (
|
||||
SELECT m.index_in_group AS idx, 0 AS direction, i.intro_status
|
||||
FROM group_member_intros i
|
||||
JOIN group_members m ON m.group_member_id = i.to_group_member_id
|
||||
WHERE i.re_group_member_id = group_members.group_member_id
|
||||
UNION ALL
|
||||
SELECT m.index_in_group AS idx, 1 AS direction, i.intro_status
|
||||
FROM group_member_intros i
|
||||
JOIN group_members m ON m.group_member_id = i.re_group_member_id
|
||||
WHERE i.to_group_member_id = group_members.group_member_id
|
||||
) AS relations
|
||||
),
|
||||
updated_at = ?
|
||||
WHERE group_member_id = ?
|
||||
AND member_relations_vector IS NULL
|
||||
|]
|
||||
(currentTs, groupMemberId)
|
||||
|
||||
getMemberRelationsVector_ :: DB.Connection -> GroupMember -> IO (Maybe ByteString)
|
||||
getMemberRelationsVector_ db GroupMember {groupMemberId} =
|
||||
maybeFirstRow fromOnly $
|
||||
DB.query
|
||||
db
|
||||
"SELECT member_relations_vector FROM group_members WHERE group_member_id = ?"
|
||||
(Only groupMemberId)
|
||||
|
||||
updateIntroStatus :: DB.Connection -> Int64 -> GroupMemberIntroStatus -> IO ()
|
||||
updateIntroStatus db introId introStatus = do
|
||||
currentTs <- getCurrentTime
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
UPDATE group_member_intros
|
||||
SET intro_status = ?, updated_at = ?
|
||||
WHERE group_member_intro_id = ?
|
||||
|]
|
||||
(introStatus, currentTs, introId)
|
||||
|
||||
getIntroduction :: DB.Connection -> GroupMember -> GroupMember -> IO (Maybe GroupMemberIntro)
|
||||
getIntroduction db reMember toMember =
|
||||
maybeFirstRow toIntro $
|
||||
DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT group_member_intro_id, intro_status
|
||||
FROM group_member_intros
|
||||
WHERE re_group_member_id = ? AND to_group_member_id = ?
|
||||
|]
|
||||
(groupMemberId' reMember, groupMemberId' toMember)
|
||||
where
|
||||
toIntro :: (Int64, GroupMemberIntroStatus) -> GroupMemberIntro
|
||||
toIntro (introId, introStatus) =
|
||||
GroupMemberIntro {introId, reMember, toMember, introStatus}
|
||||
|
||||
getIntroducedGroupMemberIds :: DB.Connection -> GroupMember -> IO [GroupMemberId]
|
||||
getIntroducedGroupMemberIds db invitee =
|
||||
map fromOnly <$>
|
||||
DB.query
|
||||
db
|
||||
"SELECT re_group_member_id FROM group_member_intros WHERE to_group_member_id = ?"
|
||||
(Only $ groupMemberId' invitee)
|
||||
|
||||
createIntroReMember :: DB.Connection -> User -> GroupInfo -> GroupMember -> VersionChat -> MemberInfo -> Maybe MemberRestrictions -> (CommandId, ConnId) -> SubscriptionMode -> ExceptT StoreError IO GroupMember
|
||||
createIntroReMember
|
||||
db
|
||||
@@ -2725,25 +2559,6 @@ getUserGroupsToExpire db User {userId} globalTTL =
|
||||
where
|
||||
cond = if globalTTL == 0 then "" else " OR chat_item_ttl IS NULL"
|
||||
|
||||
hasMembersWithoutVector :: DB.Connection -> IO Bool
|
||||
hasMembersWithoutVector db =
|
||||
fromOnly . head
|
||||
<$> DB.query_
|
||||
db
|
||||
"SELECT EXISTS (SELECT 1 FROM group_members WHERE member_relations_vector IS NULL LIMIT 1)"
|
||||
|
||||
getGMsWithoutVectorIds :: DB.Connection -> IO [GroupMemberId]
|
||||
getGMsWithoutVectorIds db =
|
||||
map fromOnly <$>
|
||||
DB.query_
|
||||
db
|
||||
[sql|
|
||||
SELECT group_member_id
|
||||
FROM group_members
|
||||
WHERE member_relations_vector IS NULL
|
||||
LIMIT 1000
|
||||
|]
|
||||
|
||||
updateGroupAlias :: DB.Connection -> UserId -> GroupInfo -> LocalAlias -> IO GroupInfo
|
||||
updateGroupAlias db userId g@GroupInfo {groupId} localAlias = do
|
||||
updatedAt <- getCurrentTime
|
||||
|
||||
@@ -335,24 +335,24 @@ updateSndMsgDeliveryStatus db connId agentMsgId sndMsgDeliveryStatus = do
|
||||
|]
|
||||
(sndMsgDeliveryStatus, currentTs, connId, agentMsgId)
|
||||
|
||||
createPendingGroupMessage :: DB.Connection -> Int64 -> MessageId -> Maybe Int64 -> IO ()
|
||||
createPendingGroupMessage db groupMemberId messageId introId_ = do
|
||||
createPendingGroupMessage :: DB.Connection -> Int64 -> MessageId -> IO ()
|
||||
createPendingGroupMessage db groupMemberId messageId = do
|
||||
currentTs <- getCurrentTime
|
||||
DB.execute
|
||||
db
|
||||
[sql|
|
||||
INSERT INTO pending_group_messages
|
||||
(group_member_id, message_id, group_member_intro_id, created_at, updated_at) VALUES (?,?,?,?,?)
|
||||
(group_member_id, message_id, created_at, updated_at) VALUES (?,?,?,?)
|
||||
|]
|
||||
(groupMemberId, messageId, introId_, currentTs, currentTs)
|
||||
(groupMemberId, messageId, currentTs, currentTs)
|
||||
|
||||
getPendingGroupMessages :: DB.Connection -> Int64 -> IO [(SndMessage, ACMEventTag, Maybe Int64)]
|
||||
getPendingGroupMessages :: DB.Connection -> Int64 -> IO [SndMessage]
|
||||
getPendingGroupMessages db groupMemberId =
|
||||
map pendingGroupMessage
|
||||
<$> DB.query
|
||||
db
|
||||
[sql|
|
||||
SELECT pgm.message_id, m.shared_msg_id, m.msg_body, m.chat_msg_event, pgm.group_member_intro_id
|
||||
SELECT pgm.message_id, m.shared_msg_id, m.msg_body
|
||||
FROM pending_group_messages pgm
|
||||
JOIN messages m USING (message_id)
|
||||
WHERE pgm.group_member_id = ?
|
||||
@@ -360,8 +360,8 @@ getPendingGroupMessages db groupMemberId =
|
||||
|]
|
||||
(Only groupMemberId)
|
||||
where
|
||||
pendingGroupMessage (msgId, sharedMsgId, msgBody, cmEventTag, introId_) =
|
||||
(SndMessage {msgId, sharedMsgId, msgBody}, cmEventTag, introId_)
|
||||
pendingGroupMessage (msgId, sharedMsgId, msgBody) =
|
||||
SndMessage {msgId, sharedMsgId, msgBody}
|
||||
|
||||
deletePendingGroupMessage :: DB.Connection -> Int64 -> MessageId -> IO ()
|
||||
deletePendingGroupMessage db groupMemberId messageId =
|
||||
|
||||
@@ -22,7 +22,7 @@ import Simplex.Chat.Store.Postgres.Migrations.M20250922_remove_unused_connection
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20251007_connections_sync
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20251017_chat_tags_cascade
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20251117_member_relations_vector
|
||||
-- import Simplex.Chat.Store.Postgres.Migrations.M20251128_member_relations_vector_stage_2
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20251128_migrate_member_relations
|
||||
import Simplex.Chat.Store.Postgres.Migrations.M20251212_chat_relays
|
||||
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
|
||||
|
||||
@@ -46,7 +46,7 @@ schemaMigrations =
|
||||
("20251007_connections_sync", m20251007_connections_sync, Just down_m20251007_connections_sync),
|
||||
("20251017_chat_tags_cascade", m20251017_chat_tags_cascade, Just down_m20251017_chat_tags_cascade),
|
||||
("20251117_member_relations_vector", m20251117_member_relations_vector, Just down_m20251117_member_relations_vector),
|
||||
-- ("20251128_member_relations_vector_stage_2", m20251128_member_relations_vector_stage_2, Just down_m20251128_member_relations_vector_stage_2)
|
||||
("20251128_migrate_member_relations", m20251128_migrate_member_relations, Just down_m20251128_migrate_member_relations),
|
||||
("20251212_chat_relays", m20251212_chat_relays, Just down_m20251212_chat_relays)
|
||||
]
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ import qualified Data.Text as T
|
||||
import Text.RawString.QQ (r)
|
||||
|
||||
-- This migration creates custom aggregate function migrate_relations_vector(idx, direction, intro_status).
|
||||
-- Used in live migration and stage 2 migration (M20251128_member_relations_vector_stage_2).
|
||||
-- Used in live migration and stage 2 migration (M20251128_migrate_member_relations).
|
||||
--
|
||||
-- Vector byte encoding: 4 reserved | 1 direction | 3 status
|
||||
-- Direction: 0 = IDSubjectIntroduced, 1 = IDReferencedIntroduced
|
||||
|
||||
+13
-13
@@ -1,9 +1,9 @@
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Chat.Store.Postgres.Migrations.M20251128_member_relations_vector_stage_2 where
|
||||
module Simplex.Chat.Store.Postgres.Migrations.M20251128_migrate_member_relations where
|
||||
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Text.RawString.QQ (r)
|
||||
|
||||
-- Build member_relations_vector for all members that don't have it yet.
|
||||
@@ -13,11 +13,9 @@ import Text.RawString.QQ (r)
|
||||
-- - direction 0 (IDSubjectIntroduced): current member (subject) is re_group_member_id, was introduced to referenced member
|
||||
-- - direction 1 (IDReferencedIntroduced): current member (subject) is to_group_member_id, referenced member was introduced to it
|
||||
|
||||
-- TODO [relations vector] drop group_member_intros in the end of migration
|
||||
m20251128_member_relations_vector_stage_2 :: Text
|
||||
m20251128_member_relations_vector_stage_2 =
|
||||
T.pack
|
||||
[r|
|
||||
m20251128_migrate_member_relations :: Text
|
||||
m20251128_migrate_member_relations =
|
||||
[r|
|
||||
UPDATE group_members
|
||||
SET member_relations_vector = (
|
||||
SELECT migrate_relations_vector(idx, direction, intro_status)
|
||||
@@ -34,12 +32,14 @@ SET member_relations_vector = (
|
||||
) AS relations
|
||||
)
|
||||
WHERE member_relations_vector IS NULL;
|
||||
|
||||
DROP INDEX idx_pending_group_messages_group_member_intro_id;
|
||||
ALTER TABLE pending_group_messages DROP COLUMN group_member_intro_id;
|
||||
|]
|
||||
|
||||
-- TODO [relations vector] re-create group_member_intros
|
||||
down_m20251128_member_relations_vector_stage_2 :: Text
|
||||
down_m20251128_member_relations_vector_stage_2 =
|
||||
T.pack
|
||||
[r|
|
||||
|
||||
down_m20251128_migrate_member_relations :: Text
|
||||
down_m20251128_migrate_member_relations =
|
||||
[r|
|
||||
ALTER TABLE pending_group_messages ADD COLUMN group_member_intro_id BIGINT REFERENCES group_member_intros ON DELETE CASCADE;
|
||||
CREATE INDEX idx_pending_group_messages_group_member_intro_id ON pending_group_messages(group_member_intro_id);
|
||||
|]
|
||||
@@ -1060,7 +1060,6 @@ CREATE TABLE test_chat_schema.pending_group_messages (
|
||||
pending_group_message_id bigint NOT NULL,
|
||||
group_member_id bigint NOT NULL,
|
||||
message_id bigint NOT NULL,
|
||||
group_member_intro_id bigint,
|
||||
created_at timestamp with time zone DEFAULT now() NOT NULL,
|
||||
updated_at timestamp with time zone DEFAULT now() NOT NULL
|
||||
);
|
||||
@@ -2321,10 +2320,6 @@ CREATE INDEX idx_pending_group_messages_group_member_id ON test_chat_schema.pend
|
||||
|
||||
|
||||
|
||||
CREATE INDEX idx_pending_group_messages_group_member_intro_id ON test_chat_schema.pending_group_messages USING btree (group_member_intro_id);
|
||||
|
||||
|
||||
|
||||
CREATE INDEX idx_pending_group_messages_message_id ON test_chat_schema.pending_group_messages USING btree (message_id);
|
||||
|
||||
|
||||
@@ -2991,11 +2986,6 @@ ALTER TABLE ONLY test_chat_schema.pending_group_messages
|
||||
|
||||
|
||||
|
||||
ALTER TABLE ONLY test_chat_schema.pending_group_messages
|
||||
ADD CONSTRAINT pending_group_messages_group_member_intro_id_fkey FOREIGN KEY (group_member_intro_id) REFERENCES test_chat_schema.group_member_intros(group_member_intro_id) ON DELETE CASCADE;
|
||||
|
||||
|
||||
|
||||
ALTER TABLE ONLY test_chat_schema.pending_group_messages
|
||||
ADD CONSTRAINT pending_group_messages_message_id_fkey FOREIGN KEY (message_id) REFERENCES test_chat_schema.messages(message_id) ON DELETE CASCADE;
|
||||
|
||||
|
||||
@@ -145,7 +145,7 @@ import Simplex.Chat.Store.SQLite.Migrations.M20250922_remove_unused_connections
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20251007_connections_sync
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20251017_chat_tags_cascade
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20251117_member_relations_vector
|
||||
-- import Simplex.Chat.Store.SQLite.Migrations.M20251128_member_relations_vector_stage_2
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20251128_migrate_member_relations
|
||||
import Simplex.Chat.Store.SQLite.Migrations.M20251212_chat_relays
|
||||
import Simplex.Messaging.Agent.Store.Shared (Migration (..))
|
||||
|
||||
@@ -292,7 +292,7 @@ schemaMigrations =
|
||||
("20251007_connections_sync", m20251007_connections_sync, Just down_m20251007_connections_sync),
|
||||
("20251017_chat_tags_cascade", m20251017_chat_tags_cascade, Just down_m20251017_chat_tags_cascade),
|
||||
("20251117_member_relations_vector", m20251117_member_relations_vector, Just down_m20251117_member_relations_vector),
|
||||
-- ("20251128_member_relations_vector_stage_2", m20251128_member_relations_vector_stage_2, Just down_m20251128_member_relations_vector_stage_2)
|
||||
("20251128_migrate_member_relations", m20251128_migrate_member_relations, Just down_m20251128_migrate_member_relations),
|
||||
("20251212_chat_relays", m20251212_chat_relays, Just down_m20251212_chat_relays)
|
||||
]
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Util (SQLiteFunc, SQLiteFuncFinal, m
|
||||
|
||||
-- This module defines custom aggregate function migrate_relations_vector(idx, direction, intro_status).
|
||||
-- It is passed via DBOpts and registered on DB open.
|
||||
-- Used in live migration and stage 2 migration (M20251128_member_relations_vector_stage_2).
|
||||
-- Used in live migration and stage 2 migration (M20251128_migrate_member_relations).
|
||||
--
|
||||
-- Vector byte encoding: 4 reserved | 1 direction | 3 status
|
||||
-- Direction: 0 = IDSubjectIntroduced, 1 = IDReferencedIntroduced
|
||||
|
||||
+10
-8
@@ -1,6 +1,6 @@
|
||||
{-# LANGUAGE QuasiQuotes #-}
|
||||
|
||||
module Simplex.Chat.Store.SQLite.Migrations.M20251128_member_relations_vector_stage_2 where
|
||||
module Simplex.Chat.Store.SQLite.Migrations.M20251128_migrate_member_relations where
|
||||
|
||||
import Database.SQLite.Simple (Query)
|
||||
import Database.SQLite.Simple.QQ (sql)
|
||||
@@ -12,9 +12,8 @@ import Database.SQLite.Simple.QQ (sql)
|
||||
-- - direction 0 (IDSubjectIntroduced): current member (subject) is re_group_member_id, was introduced to referenced member
|
||||
-- - direction 1 (IDReferencedIntroduced): current member (subject) is to_group_member_id, referenced member was introduced to it
|
||||
|
||||
-- TODO [relations vector] drop group_member_intros in the end of migration
|
||||
m20251128_member_relations_vector_stage_2 :: Query
|
||||
m20251128_member_relations_vector_stage_2 =
|
||||
m20251128_migrate_member_relations :: Query
|
||||
m20251128_migrate_member_relations =
|
||||
[sql|
|
||||
UPDATE group_members
|
||||
SET member_relations_vector = (
|
||||
@@ -32,11 +31,14 @@ SET member_relations_vector = (
|
||||
)
|
||||
)
|
||||
WHERE member_relations_vector IS NULL;
|
||||
|
||||
DROP INDEX idx_pending_group_messages_group_member_intro_id;
|
||||
ALTER TABLE pending_group_messages DROP COLUMN group_member_intro_id;
|
||||
|]
|
||||
|
||||
-- TODO [relations vector] re-create group_member_intros
|
||||
down_m20251128_member_relations_vector_stage_2 :: Query
|
||||
down_m20251128_member_relations_vector_stage_2 =
|
||||
down_m20251128_migrate_member_relations :: Query
|
||||
down_m20251128_migrate_member_relations =
|
||||
[sql|
|
||||
|
||||
ALTER TABLE pending_group_messages ADD COLUMN group_member_intro_id INTEGER REFERENCES group_member_intros ON DELETE CASCADE;
|
||||
CREATE INDEX idx_pending_group_messages_group_member_intro_id ON pending_group_messages(group_member_intro_id);
|
||||
|]
|
||||
@@ -882,7 +882,7 @@ Query:
|
||||
FROM rcv_queues q
|
||||
JOIN servers s ON q.host = s.host AND q.port = s.port
|
||||
JOIN connections c ON q.conn_id = c.conn_id
|
||||
WHERE c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?
|
||||
WHERE c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ? AND COALESCE(q.server_key_hash, s.key_hash) = ?
|
||||
Plan:
|
||||
SEARCH s USING PRIMARY KEY (host=? AND port=?)
|
||||
SEARCH q USING PRIMARY KEY (host=? AND port=?)
|
||||
@@ -894,7 +894,7 @@ Query:
|
||||
FROM rcv_queues q
|
||||
JOIN servers s ON q.host = s.host AND q.port = s.port
|
||||
JOIN connections c ON q.conn_id = c.conn_id
|
||||
WHERE q.to_subscribe = 1 AND c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ?
|
||||
WHERE q.to_subscribe = 1 AND c.deleted = 0 AND q.deleted = 0 AND c.user_id = ? AND q.host = ? AND q.port = ? AND COALESCE(q.server_key_hash, s.key_hash) = ?
|
||||
Plan:
|
||||
SEARCH q USING INDEX idx_rcv_queues_to_subscribe (to_subscribe=? AND host=? AND port=?)
|
||||
SEARCH c USING PRIMARY KEY (conn_id=?)
|
||||
|
||||
@@ -3444,14 +3444,6 @@ Query:
|
||||
Plan:
|
||||
SEARCH chat_item_reactions USING INDEX idx_chat_item_reactions_group (group_id=? AND shared_msg_id=?)
|
||||
|
||||
Query:
|
||||
SELECT group_member_intro_id, intro_status
|
||||
FROM group_member_intros
|
||||
WHERE re_group_member_id = ? AND to_group_member_id = ?
|
||||
|
||||
Plan:
|
||||
SEARCH group_member_intros USING INDEX sqlite_autoindex_group_member_intros_1 (re_group_member_id=? AND to_group_member_id=?)
|
||||
|
||||
Query:
|
||||
SELECT group_scope_tag, group_scope_group_member_id
|
||||
FROM chat_items
|
||||
@@ -3501,7 +3493,7 @@ SEARCH m USING INDEX idx_group_members_user_id (user_id=?)
|
||||
SEARCH p USING INTEGER PRIMARY KEY (rowid=?)
|
||||
|
||||
Query:
|
||||
SELECT pgm.message_id, m.shared_msg_id, m.msg_body, m.chat_msg_event, pgm.group_member_intro_id
|
||||
SELECT pgm.message_id, m.shared_msg_id, m.msg_body
|
||||
FROM pending_group_messages pgm
|
||||
JOIN messages m USING (message_id)
|
||||
WHERE pgm.group_member_id = ?
|
||||
@@ -3732,52 +3724,6 @@ SEARCH connections USING INDEX idx_connections_group_member_id (group_member_id=
|
||||
LIST SUBQUERY 1
|
||||
SCAN group_members USING COVERING INDEX idx_group_members_user_id_local_display_name
|
||||
|
||||
Query:
|
||||
UPDATE group_member_intros SET intro_status='fwd'
|
||||
WHERE re_group_member_id IN (SELECT group_member_id FROM group_members WHERE local_display_name = ?)
|
||||
AND to_group_member_id IN (SELECT group_member_id FROM group_members WHERE local_display_name = ?)
|
||||
|
||||
Plan:
|
||||
SEARCH group_member_intros USING INDEX sqlite_autoindex_group_member_intros_1 (re_group_member_id=? AND to_group_member_id=?)
|
||||
LIST SUBQUERY 1
|
||||
SCAN group_members USING COVERING INDEX idx_group_members_user_id_local_display_name
|
||||
LIST SUBQUERY 2
|
||||
SCAN group_members USING COVERING INDEX idx_group_members_user_id_local_display_name
|
||||
|
||||
Query:
|
||||
UPDATE group_members
|
||||
SET
|
||||
member_relations_vector = (
|
||||
SELECT migrate_relations_vector(idx, direction, intro_status)
|
||||
FROM (
|
||||
SELECT m.index_in_group AS idx, 0 AS direction, i.intro_status
|
||||
FROM group_member_intros i
|
||||
JOIN group_members m ON m.group_member_id = i.to_group_member_id
|
||||
WHERE i.re_group_member_id = group_members.group_member_id
|
||||
UNION ALL
|
||||
SELECT m.index_in_group AS idx, 1 AS direction, i.intro_status
|
||||
FROM group_member_intros i
|
||||
JOIN group_members m ON m.group_member_id = i.re_group_member_id
|
||||
WHERE i.to_group_member_id = group_members.group_member_id
|
||||
) AS relations
|
||||
),
|
||||
updated_at = ?
|
||||
WHERE group_member_id = ?
|
||||
AND member_relations_vector IS NULL
|
||||
|
||||
Plan:
|
||||
SEARCH group_members USING INTEGER PRIMARY KEY (rowid=?)
|
||||
CORRELATED SCALAR SUBQUERY 3
|
||||
CO-ROUTINE relations
|
||||
COMPOUND QUERY
|
||||
LEFT-MOST SUBQUERY
|
||||
SEARCH i USING INDEX idx_group_member_intros_re_group_member_id (re_group_member_id=?)
|
||||
SEARCH m USING INTEGER PRIMARY KEY (rowid=?)
|
||||
UNION ALL
|
||||
SEARCH i USING INDEX idx_group_member_intros_to_group_member_id (to_group_member_id=?)
|
||||
SEARCH m USING INTEGER PRIMARY KEY (rowid=?)
|
||||
SCAN relations
|
||||
|
||||
Query:
|
||||
UPDATE group_members
|
||||
SET contact_id = ?, local_display_name = ?, contact_profile_id = ?, updated_at = ?
|
||||
@@ -4449,7 +4395,7 @@ Plan:
|
||||
|
||||
Query:
|
||||
INSERT INTO pending_group_messages
|
||||
(group_member_id, message_id, group_member_intro_id, created_at, updated_at) VALUES (?,?,?,?,?)
|
||||
(group_member_id, message_id, created_at, updated_at) VALUES (?,?,?,?)
|
||||
|
||||
Plan:
|
||||
|
||||
@@ -6111,10 +6057,6 @@ Plan:
|
||||
Query: INSERT INTO xftp_file_descriptions (user_id, file_descr_text, file_descr_part_no, file_descr_complete, created_at, updated_at) VALUES (?,?,?,?,?,?)
|
||||
Plan:
|
||||
|
||||
Query: SELECT 1 FROM group_members WHERE group_member_id = ? AND member_relations_vector IS NOT NULL
|
||||
Plan:
|
||||
SEARCH group_members USING INTEGER PRIMARY KEY (rowid=?)
|
||||
|
||||
Query: SELECT 1 FROM settings WHERE user_id = ? LIMIT 1
|
||||
Plan:
|
||||
SEARCH settings USING COVERING INDEX idx_settings_user_id (user_id=?)
|
||||
@@ -6145,12 +6087,6 @@ SCAN CONSTANT ROW
|
||||
SCALAR SUBQUERY 1
|
||||
SEARCH chat_items USING COVERING INDEX idx_chat_items_contacts_created_at (user_id=? AND contact_id=?)
|
||||
|
||||
Query: SELECT EXISTS (SELECT 1 FROM group_members WHERE member_relations_vector IS NULL LIMIT 1)
|
||||
Plan:
|
||||
SCAN CONSTANT ROW
|
||||
SCALAR SUBQUERY 1
|
||||
SCAN group_members
|
||||
|
||||
Query: SELECT accepted_at FROM operator_usage_conditions WHERE server_operator_id = ? AND conditions_commit = ?
|
||||
Plan:
|
||||
SEARCH operator_usage_conditions USING INDEX idx_operator_usage_conditions_conditions_commit (conditions_commit=? AND server_operator_id=?)
|
||||
@@ -6531,14 +6467,6 @@ Query: UPDATE files SET private_snd_file_descr = ?, updated_at = ? WHERE user_id
|
||||
Plan:
|
||||
SEARCH files USING INTEGER PRIMARY KEY (rowid=?)
|
||||
|
||||
Query: UPDATE group_member_intros SET intro_status='con'
|
||||
Plan:
|
||||
SCAN group_member_intros
|
||||
|
||||
Query: UPDATE group_member_intros SET intro_status='fwd'
|
||||
Plan:
|
||||
SCAN group_member_intros
|
||||
|
||||
Query: UPDATE group_members SET contact_id = ?, updated_at = ? WHERE contact_profile_id = ?
|
||||
Plan:
|
||||
SEARCH group_members USING COVERING INDEX idx_group_members_contact_profile_id (contact_profile_id=?)
|
||||
|
||||
@@ -395,7 +395,6 @@ CREATE TABLE pending_group_messages(
|
||||
pending_group_message_id INTEGER PRIMARY KEY,
|
||||
group_member_id INTEGER NOT NULL REFERENCES group_members ON DELETE CASCADE,
|
||||
message_id INTEGER NOT NULL REFERENCES messages ON DELETE CASCADE,
|
||||
group_member_intro_id INTEGER REFERENCES group_member_intros ON DELETE CASCADE,
|
||||
created_at TEXT NOT NULL DEFAULT(datetime('now')),
|
||||
updated_at TEXT NOT NULL DEFAULT(datetime('now'))
|
||||
);
|
||||
@@ -820,9 +819,6 @@ CREATE INDEX idx_group_profiles_user_id ON group_profiles(user_id);
|
||||
CREATE INDEX idx_groups_chat_item_id ON groups(chat_item_id);
|
||||
CREATE INDEX idx_groups_group_profile_id ON groups(group_profile_id);
|
||||
CREATE INDEX idx_messages_group_id ON messages(group_id);
|
||||
CREATE INDEX idx_pending_group_messages_group_member_intro_id ON pending_group_messages(
|
||||
group_member_intro_id
|
||||
);
|
||||
CREATE INDEX idx_pending_group_messages_message_id ON pending_group_messages(
|
||||
message_id
|
||||
);
|
||||
|
||||
@@ -1752,49 +1752,6 @@ instance TextEncoding ConnType where
|
||||
ConnMember -> "member"
|
||||
ConnUserContact -> "user_contact"
|
||||
|
||||
data GroupMemberIntro = GroupMemberIntro
|
||||
{ introId :: Int64,
|
||||
reMember :: GroupMember,
|
||||
toMember :: GroupMember,
|
||||
introStatus :: GroupMemberIntroStatus
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
data GroupMemberIntroStatus
|
||||
= GMIntroPending
|
||||
| GMIntroSent
|
||||
| GMIntroInvReceived
|
||||
| GMIntroInvForwarded
|
||||
| GMIntroReConnected
|
||||
| GMIntroToConnected
|
||||
| GMIntroConnected
|
||||
deriving (Eq, Show)
|
||||
|
||||
instance FromField GroupMemberIntroStatus where fromField = fromTextField_ introStatusT
|
||||
|
||||
instance ToField GroupMemberIntroStatus where toField = toField . serializeIntroStatus
|
||||
|
||||
introStatusT :: Text -> Maybe GroupMemberIntroStatus
|
||||
introStatusT = \case
|
||||
"new" -> Just GMIntroPending
|
||||
"sent" -> Just GMIntroSent
|
||||
"rcv" -> Just GMIntroInvReceived
|
||||
"fwd" -> Just GMIntroInvForwarded
|
||||
"re-con" -> Just GMIntroReConnected
|
||||
"to-con" -> Just GMIntroToConnected
|
||||
"con" -> Just GMIntroConnected
|
||||
_ -> Nothing
|
||||
|
||||
serializeIntroStatus :: GroupMemberIntroStatus -> Text
|
||||
serializeIntroStatus = \case
|
||||
GMIntroPending -> "new"
|
||||
GMIntroSent -> "sent"
|
||||
GMIntroInvReceived -> "rcv"
|
||||
GMIntroInvForwarded -> "fwd"
|
||||
GMIntroReConnected -> "re-con"
|
||||
GMIntroToConnected -> "to-con"
|
||||
GMIntroConnected -> "con"
|
||||
|
||||
type CommandId = Int64
|
||||
|
||||
aCorrId :: CommandId -> ACorrId
|
||||
|
||||
@@ -259,11 +259,10 @@ chatResponseToView hu cfg@ChatConfig {logLevel, showReactions, testView} liveIte
|
||||
rhi_
|
||||
]
|
||||
CRRemoteHostList hs -> viewRemoteHosts hs
|
||||
CRRemoteHostStarted {remoteHost_, invitation, localAddrs = RCCtrlAddress {address} :| _, ctrlPort} ->
|
||||
[ plain $ maybe ("new remote host" <> started) (\RemoteHostInfo {remoteHostId = rhId} -> "remote host " <> show rhId <> started) remoteHost_,
|
||||
"Remote session invitation:",
|
||||
plain invitation
|
||||
]
|
||||
CRRemoteHostStarted {remoteHost_, invitation, localAddrs = RCCtrlAddress {address} :| addrs, ctrlPort} ->
|
||||
[plain $ maybe ("new remote host" <> started) (\RemoteHostInfo {remoteHostId = rhId} -> "remote host " <> show rhId <> started) remoteHost_]
|
||||
<> [plain $ "other addresses: " <> intercalate " " (map (\RCCtrlAddress {address = a} -> B.unpack (strEncode a)) addrs) | not (null addrs)]
|
||||
<> ["Remote session invitation:", plain invitation]
|
||||
where
|
||||
started = " started on " <> B.unpack (strEncode address) <> ":" <> ctrlPort
|
||||
CRRemoteFileStored rhId (CryptoFile filePath cfArgs_) ->
|
||||
@@ -274,8 +273,10 @@ chatResponseToView hu cfg@ChatConfig {logLevel, showReactions, testView} liveIte
|
||||
[ (maybe "connecting new remote controller" (\RemoteCtrlInfo {remoteCtrlId} -> "connecting remote controller " <> sShow remoteCtrlId) remoteCtrl_ <> ": ")
|
||||
<> viewRemoteCtrl ctrlAppInfo appVersion True
|
||||
]
|
||||
CRRemoteCtrlConnected RemoteCtrlInfo {remoteCtrlId = rcId, ctrlDeviceName} ->
|
||||
["remote controller " <> sShow rcId <> " session started with " <> plain ctrlDeviceName]
|
||||
CRRemoteCtrlConnected RemoteCtrlInfo {remoteCtrlId = rcId, ctrlDeviceName} compression ->
|
||||
["remote controller " <> sShow rcId <> " session started with " <> plain ctrlDeviceName <> " (" <> compressStr <> " compression)"]
|
||||
where
|
||||
compressStr = if compression then "with" else "no"
|
||||
CRSQLResult rows -> map plain rows
|
||||
#if !defined(dbPostgres)
|
||||
CRArchiveExported archiveErrs -> if null archiveErrs then ["ok"] else ["archive export errors: " <> plain (show archiveErrs)]
|
||||
@@ -487,7 +488,9 @@ chatEventToView hu ChatConfig {logLevel, showReactions, showReceipts, testView}
|
||||
plain sessionCode
|
||||
]
|
||||
CEvtNewRemoteHost RemoteHostInfo {remoteHostId = rhId, hostDeviceName} -> ["new remote host " <> sShow rhId <> " added: " <> plain hostDeviceName]
|
||||
CEvtRemoteHostConnected RemoteHostInfo {remoteHostId = rhId} -> ["remote host " <> sShow rhId <> " connected"]
|
||||
CEvtRemoteHostConnected RemoteHostInfo {remoteHostId = rhId} compression -> ["remote host " <> sShow rhId <> " connected (" <> compressStr <> " compression)"]
|
||||
where
|
||||
compressStr = if compression then "with" else "no"
|
||||
CEvtRemoteHostStopped {remoteHostId_} ->
|
||||
[ maybe "new remote host" (mappend "remote host " . sShow) remoteHostId_ <> " stopped"
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user