mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2026-03-30 22:55:48 +00:00
core: manage calls for all users (#1748)
This commit is contained in:
@@ -711,8 +711,7 @@ func apiEndCall(_ contact: Contact) async throws {
|
||||
}
|
||||
|
||||
func apiGetCallInvitations() throws -> [RcvCallInvitation] {
|
||||
guard let userId = ChatModel.shared.currentUser?.userId else { throw RuntimeError("apiGetCallInvitations: no current user") }
|
||||
let r = chatSendCmdSync(.apiGetCallInvitations(userId: userId))
|
||||
let r = chatSendCmdSync(.apiGetCallInvitations)
|
||||
if case let .callInvitations(invs) = r { return invs }
|
||||
throw r
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ public enum ChatCommand {
|
||||
case apiSendCallAnswer(contact: Contact, answer: WebRTCSession)
|
||||
case apiSendCallExtraInfo(contact: Contact, extraInfo: WebRTCExtraInfo)
|
||||
case apiEndCall(contact: Contact)
|
||||
case apiGetCallInvitations(userId: Int64)
|
||||
case apiGetCallInvitations
|
||||
case apiCallStatus(contact: Contact, callStatus: WebRTCCallStatus)
|
||||
case apiChatRead(type: ChatType, id: Int64, itemRange: (Int64, Int64))
|
||||
case apiChatUnread(type: ChatType, id: Int64, unreadChat: Bool)
|
||||
@@ -169,7 +169,7 @@ public enum ChatCommand {
|
||||
case let .apiSendCallAnswer(contact, answer): return "/_call answer @\(contact.apiId) \(encodeJSON(answer))"
|
||||
case let .apiSendCallExtraInfo(contact, extraInfo): return "/_call extra @\(contact.apiId) \(encodeJSON(extraInfo))"
|
||||
case let .apiEndCall(contact): return "/_call end @\(contact.apiId)"
|
||||
case let .apiGetCallInvitations(userId): return "/_call get \(userId)"
|
||||
case .apiGetCallInvitations: return "/_call get"
|
||||
case let .apiCallStatus(contact, callStatus): return "/_call status @\(contact.apiId) \(callStatus.rawValue)"
|
||||
case let .apiChatRead(type, id, itemRange: (from, to)): return "/_read chat \(ref(type, id)) from=\(from) to=\(to)"
|
||||
case let .apiChatUnread(type, id, unreadChat): return "/_unread chat \(ref(type, id)) \(onOff(unreadChat))"
|
||||
|
||||
@@ -28,7 +28,7 @@ import qualified Data.ByteString.Base64 as B64
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Char (isSpace)
|
||||
import Data.Either (fromRight)
|
||||
import Data.Either (fromRight, rights)
|
||||
import Data.Fixed (div')
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
@@ -186,11 +186,11 @@ activeAgentServers ChatConfig {defaultServers = DefaultAgentServers {smp}} =
|
||||
. map (\ServerCfg {server} -> server)
|
||||
. filter (\ServerCfg {enabled} -> enabled)
|
||||
|
||||
startChatController :: forall m. (MonadUnliftIO m, MonadReader ChatController m) => User -> Bool -> Bool -> m (Async ())
|
||||
startChatController currentUser subConns enableExpireCIs = do
|
||||
startChatController :: forall m. (MonadUnliftIO m, MonadReader ChatController m) => Bool -> Bool -> m (Async ())
|
||||
startChatController subConns enableExpireCIs = do
|
||||
asks smpAgent >>= resumeAgentClient
|
||||
users <- fromRight [] <$> runExceptT (withStore' getUsers)
|
||||
restoreCalls currentUser
|
||||
restoreCalls
|
||||
s <- asks agentAsync
|
||||
readTVarIO s >>= maybe (start s users) (pure . fst)
|
||||
where
|
||||
@@ -227,9 +227,9 @@ subscribeUsers users = do
|
||||
subscribe :: [User] -> m ()
|
||||
subscribe = mapM_ $ runExceptT . subscribeUserConnections Agent.subscribeConnections
|
||||
|
||||
restoreCalls :: (MonadUnliftIO m, MonadReader ChatController m) => User -> m ()
|
||||
restoreCalls user = do
|
||||
savedCalls <- fromRight [] <$> runExceptT (withStore' $ \db -> getCalls db user)
|
||||
restoreCalls :: (MonadUnliftIO m, MonadReader ChatController m) => m ()
|
||||
restoreCalls = do
|
||||
savedCalls <- fromRight [] <$> runExceptT (withStore' $ \db -> getCalls db)
|
||||
let callsMap = M.fromList $ map (\call@Call {contactId} -> (contactId, call)) savedCalls
|
||||
calls <- asks currentCalls
|
||||
atomically $ writeTVar calls callsMap
|
||||
@@ -298,15 +298,15 @@ processChatCommand = \case
|
||||
setActive ActiveNone
|
||||
pure $ CRCmdOk Nothing
|
||||
DeleteUser uName -> withUserName uName APIDeleteUser
|
||||
StartChat subConns enableExpireCIs -> withUser' $ \user ->
|
||||
StartChat subConns enableExpireCIs -> withUser' $ \_ ->
|
||||
asks agentAsync >>= readTVarIO >>= \case
|
||||
Just _ -> pure CRChatRunning
|
||||
_ -> checkStoreNotChanged $ startChatController user subConns enableExpireCIs $> CRChatStarted
|
||||
_ -> checkStoreNotChanged $ startChatController subConns enableExpireCIs $> CRChatStarted
|
||||
APIStopChat -> do
|
||||
ask >>= stopChatController
|
||||
pure CRChatStopped
|
||||
APIActivateChat -> withUser $ \user -> do
|
||||
restoreCalls user
|
||||
APIActivateChat -> withUser $ \_ -> do
|
||||
restoreCalls
|
||||
withAgent activateAgent
|
||||
setAllExpireCIFlags True
|
||||
pure $ CRCmdOk Nothing
|
||||
@@ -702,25 +702,25 @@ processChatCommand = \case
|
||||
_ -> throwChatError . CECallState $ callStateTag callState
|
||||
APISendCallOffer contactId WebRTCCallOffer {callType, rtcSession} ->
|
||||
-- party accepting call
|
||||
withCurrentCall contactId $ \userId ct call@Call {callId, chatItemId, callState} -> case callState of
|
||||
withCurrentCall contactId $ \user ct call@Call {callId, chatItemId, callState} -> case callState of
|
||||
CallInvitationReceived {peerCallType, localDhPubKey, sharedKey} -> do
|
||||
let callDhPubKey = if encryptedCall callType then localDhPubKey else Nothing
|
||||
offer = CallOffer {callType, rtcSession, callDhPubKey}
|
||||
callState' = CallOfferSent {localCallType = callType, peerCallType, localCallSession = rtcSession, sharedKey}
|
||||
aciContent = ACIContent SMDRcv $ CIRcvCall CISCallAccepted 0
|
||||
(SndMessage {msgId}, _) <- sendDirectContactMessage ct (XCallOffer callId offer)
|
||||
withStore' $ \db -> updateDirectChatItemsRead db userId contactId $ Just (chatItemId, chatItemId)
|
||||
updateDirectChatItemView userId ct chatItemId aciContent False $ Just msgId
|
||||
withStore' $ \db -> updateDirectChatItemsRead db user contactId $ Just (chatItemId, chatItemId)
|
||||
updateDirectChatItemView user ct chatItemId aciContent False $ Just msgId
|
||||
pure $ Just call {callState = callState'}
|
||||
_ -> throwChatError . CECallState $ callStateTag callState
|
||||
APISendCallAnswer contactId rtcSession ->
|
||||
-- party initiating call
|
||||
withCurrentCall contactId $ \userId ct call@Call {callId, chatItemId, callState} -> case callState of
|
||||
withCurrentCall contactId $ \user ct call@Call {callId, chatItemId, callState} -> case callState of
|
||||
CallOfferReceived {localCallType, peerCallType, peerCallSession, sharedKey} -> do
|
||||
let callState' = CallNegotiated {localCallType, peerCallType, localCallSession = rtcSession, peerCallSession, sharedKey}
|
||||
aciContent = ACIContent SMDSnd $ CISndCall CISCallNegotiated 0
|
||||
(SndMessage {msgId}, _) <- sendDirectContactMessage ct (XCallAnswer callId CallAnswer {rtcSession})
|
||||
updateDirectChatItemView userId ct chatItemId aciContent False $ Just msgId
|
||||
updateDirectChatItemView user ct chatItemId aciContent False $ Just msgId
|
||||
pure $ Just call {callState = callState'}
|
||||
_ -> throwChatError . CECallState $ callStateTag callState
|
||||
APISendCallExtraInfo contactId rtcExtraInfo ->
|
||||
@@ -739,25 +739,26 @@ processChatCommand = \case
|
||||
_ -> throwChatError . CECallState $ callStateTag callState
|
||||
APIEndCall contactId ->
|
||||
-- any call party
|
||||
withCurrentCall contactId $ \userId ct call@Call {callId} -> do
|
||||
withCurrentCall contactId $ \user ct call@Call {callId} -> do
|
||||
(SndMessage {msgId}, _) <- sendDirectContactMessage ct (XCallEnd callId)
|
||||
updateCallItemStatus userId ct call WCSDisconnected $ Just msgId
|
||||
updateCallItemStatus user ct call WCSDisconnected $ Just msgId
|
||||
pure Nothing
|
||||
APIGetCallInvitations userId -> withUserId userId $ \user -> do
|
||||
APIGetCallInvitations -> withUser $ \_ -> do
|
||||
calls <- asks currentCalls >>= readTVarIO
|
||||
let invs = mapMaybe callInvitation $ M.elems calls
|
||||
rcvCallInvitations <- mapM (rcvCallInvitation user) invs
|
||||
pure $ CRCallInvitations user rcvCallInvitations
|
||||
rcvCallInvitations <- rights <$> mapM rcvCallInvitation invs
|
||||
pure $ CRCallInvitations rcvCallInvitations
|
||||
where
|
||||
callInvitation Call {contactId, callState, callTs} = case callState of
|
||||
CallInvitationReceived {peerCallType, sharedKey} -> Just (contactId, callTs, peerCallType, sharedKey)
|
||||
_ -> Nothing
|
||||
rcvCallInvitation user (contactId, callTs, peerCallType, sharedKey) = do
|
||||
contact <- withStore (\db -> getContact db user contactId)
|
||||
rcvCallInvitation (contactId, callTs, peerCallType, sharedKey) = runExceptT . withStore $ \db -> do
|
||||
user <- getUserByContactId db contactId
|
||||
contact <- getContact db user contactId
|
||||
pure RcvCallInvitation {contact, callType = peerCallType, sharedKey, callTs}
|
||||
APICallStatus contactId receivedStatus ->
|
||||
withCurrentCall contactId $ \userId ct call ->
|
||||
updateCallItemStatus userId ct call receivedStatus Nothing $> Just call
|
||||
withCurrentCall contactId $ \user ct call ->
|
||||
updateCallItemStatus user ct call receivedStatus Nothing $> Just call
|
||||
APIUpdateProfile userId profile -> withUserId userId (`updateProfile` profile)
|
||||
APISetContactPrefs contactId prefs' -> withUser $ \user -> do
|
||||
ct <- withStore $ \db -> getContact db user contactId
|
||||
@@ -1470,8 +1471,10 @@ processChatCommand = \case
|
||||
let s = connStatus $ activeConn (ct :: Contact)
|
||||
in s == ConnReady || s == ConnSndReady
|
||||
withCurrentCall :: ContactId -> (User -> Contact -> Call -> m (Maybe Call)) -> m ChatResponse
|
||||
withCurrentCall ctId action = withUser $ \user -> do
|
||||
ct <- withStore $ \db -> getContact db user ctId
|
||||
withCurrentCall ctId action = do
|
||||
(user, ct) <- withStore $ \db -> do
|
||||
user <- getUserByContactId db ctId
|
||||
(user,) <$> getContact db user ctId
|
||||
calls <- asks currentCalls
|
||||
withChatLock "currentCall" $
|
||||
atomically (TM.lookup ctId calls) >>= \case
|
||||
@@ -3878,7 +3881,7 @@ chatCommandP =
|
||||
"/_call extra @" *> (APISendCallExtraInfo <$> A.decimal <* A.space <*> jsonP),
|
||||
"/_call end @" *> (APIEndCall <$> A.decimal),
|
||||
"/_call status @" *> (APICallStatus <$> A.decimal <* A.space <*> strP),
|
||||
"/_call get " *> (APIGetCallInvitations <$> A.decimal),
|
||||
"/_call get" $> APIGetCallInvitations,
|
||||
"/_profile " *> (APIUpdateProfile <$> A.decimal <* A.space <*> jsonP),
|
||||
"/_set alias @" *> (APISetContactAlias <$> A.decimal <*> (A.space *> textP <|> pure "")),
|
||||
"/_set alias :" *> (APISetConnectionAlias <$> A.decimal <*> (A.space *> textP <|> pure "")),
|
||||
|
||||
@@ -183,7 +183,7 @@ data ChatCommand
|
||||
| APISendCallAnswer ContactId WebRTCSession
|
||||
| APISendCallExtraInfo ContactId WebRTCExtraInfo
|
||||
| APIEndCall ContactId
|
||||
| APIGetCallInvitations UserId
|
||||
| APIGetCallInvitations
|
||||
| APICallStatus ContactId WebRTCCallStatus
|
||||
| APIUpdateProfile UserId Profile
|
||||
| APISetContactPrefs ContactId Preferences
|
||||
@@ -431,7 +431,7 @@ data ChatResponse
|
||||
| CRCallAnswer {user :: User, contact :: Contact, answer :: WebRTCSession}
|
||||
| CRCallExtraInfo {user :: User, contact :: Contact, extraInfo :: WebRTCExtraInfo}
|
||||
| CRCallEnded {user :: User, contact :: Contact}
|
||||
| CRCallInvitations {user :: User, callInvitations :: [RcvCallInvitation]}
|
||||
| CRCallInvitations {callInvitations :: [RcvCallInvitation]}
|
||||
| CRUserContactLinkSubscribed -- TODO delete
|
||||
| CRUserContactLinkSubError {chatError :: ChatError} -- TODO delete
|
||||
| CRNtfTokenStatus {status :: NtfTknStatus}
|
||||
|
||||
@@ -30,7 +30,7 @@ runSimplexChat :: ChatOpts -> User -> ChatController -> (User -> ChatController
|
||||
runSimplexChat ChatOpts {maintenance} u cc chat
|
||||
| maintenance = wait =<< async (chat u cc)
|
||||
| otherwise = do
|
||||
a1 <- runReaderT (startChatController u True True) cc
|
||||
a1 <- runReaderT (startChatController True True) cc
|
||||
a2 <- async $ chat u cc
|
||||
waitEither_ a1 a2
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ module Simplex.Chat.Store
|
||||
getSetActiveUser,
|
||||
getUserIdByName,
|
||||
getUserByAConnId,
|
||||
getUserByContactId,
|
||||
createDirectConnection,
|
||||
createConnReqConnection,
|
||||
getProfileById,
|
||||
@@ -455,10 +456,10 @@ getUsers db =
|
||||
userQuery :: Query
|
||||
userQuery =
|
||||
[sql|
|
||||
SELECT u.user_id, u.agent_user_id, u.contact_id, cp.contact_profile_id, u.active_user, u.local_display_name, cp.full_name, cp.image, cp.preferences
|
||||
SELECT u.user_id, u.agent_user_id, u.contact_id, ucp.contact_profile_id, u.active_user, u.local_display_name, ucp.full_name, ucp.image, ucp.preferences
|
||||
FROM users u
|
||||
JOIN contacts ct ON ct.contact_id = u.contact_id
|
||||
JOIN contact_profiles cp ON cp.contact_profile_id = ct.contact_profile_id
|
||||
JOIN contacts uct ON uct.contact_id = u.contact_id
|
||||
JOIN contact_profiles ucp ON ucp.contact_profile_id = uct.contact_profile_id
|
||||
|]
|
||||
|
||||
toUser :: (UserId, UserId, ContactId, ProfileId, Bool, ContactName, Text, Maybe ImageData, Maybe Preferences) -> User
|
||||
@@ -491,6 +492,11 @@ getUserByAConnId db agentConnId =
|
||||
maybeFirstRow toUser $
|
||||
DB.query db (userQuery <> " JOIN connections c ON c.user_id = u.user_id WHERE c.agent_conn_id = ?") (Only agentConnId)
|
||||
|
||||
getUserByContactId :: DB.Connection -> ContactId -> ExceptT StoreError IO User
|
||||
getUserByContactId db contactId =
|
||||
ExceptT . firstRow toUser (SEUserNotFoundByContactId contactId) $
|
||||
DB.query db (userQuery <> " JOIN contacts ct ON ct.user_id = u.user_id WHERE ct.contact_id = ?") (Only contactId)
|
||||
|
||||
createConnReqConnection :: DB.Connection -> UserId -> ConnId -> ConnReqUriHash -> XContactId -> Maybe Profile -> Maybe GroupLinkId -> IO PendingContactConnection
|
||||
createConnReqConnection db userId acId cReqHash xContactId incognitoProfile groupLinkId = do
|
||||
createdAt <- getCurrentTime
|
||||
@@ -4552,19 +4558,17 @@ deleteCalls :: DB.Connection -> User -> ContactId -> IO ()
|
||||
deleteCalls db User {userId} contactId = do
|
||||
DB.execute db "DELETE FROM calls WHERE user_id = ? AND contact_id = ?" (userId, contactId)
|
||||
|
||||
getCalls :: DB.Connection -> User -> IO [Call]
|
||||
getCalls db User {userId} = do
|
||||
getCalls :: DB.Connection -> IO [Call]
|
||||
getCalls db =
|
||||
map toCall
|
||||
<$> DB.query
|
||||
<$> DB.query_
|
||||
db
|
||||
[sql|
|
||||
SELECT
|
||||
contact_id, shared_call_id, chat_item_id, call_state, call_ts
|
||||
FROM calls
|
||||
WHERE user_id = ?
|
||||
ORDER BY call_ts ASC
|
||||
|]
|
||||
(Only userId)
|
||||
where
|
||||
toCall :: (ContactId, CallId, ChatItemId, CallState, UTCTime) -> Call
|
||||
toCall (contactId, callId, chatItemId, callState, callTs) = Call {contactId, callId, chatItemId, callState, callTs}
|
||||
@@ -4849,6 +4853,7 @@ data StoreError
|
||||
= SEDuplicateName
|
||||
| SEUserNotFound {userId :: UserId}
|
||||
| SEUserNotFoundByName {contactName :: ContactName}
|
||||
| SEUserNotFoundByContactId {contactId :: ContactId}
|
||||
| SEContactNotFound {contactId :: ContactId}
|
||||
| SEContactNotFoundByName {contactName :: ContactName}
|
||||
| SEContactNotReady {contactName :: ContactName}
|
||||
|
||||
@@ -201,7 +201,7 @@ responseToView user_ testView liveItems ts = \case
|
||||
CRCallAnswer {user = u, contact, answer} -> ttyUser u $ viewCallAnswer contact answer
|
||||
CRCallExtraInfo {user = u, contact} -> ttyUser u ["call extra info from " <> ttyContact' contact]
|
||||
CRCallEnded {user = u, contact} -> ttyUser u ["call with " <> ttyContact' contact <> " ended"]
|
||||
CRCallInvitations u _ -> ttyUser u []
|
||||
CRCallInvitations _ -> []
|
||||
CRUserContactLinkSubscribed -> ["Your address is active! To show: " <> highlight' "/sa"]
|
||||
CRUserContactLinkSubError e -> ["user address error: " <> sShow e, "to delete your address: " <> highlight' "/da"]
|
||||
CRNewContactConnection u _ -> ttyUser u []
|
||||
|
||||
@@ -4186,7 +4186,7 @@ testNegotiateCall =
|
||||
testChat2 aliceProfile bobProfile $ \alice bob -> do
|
||||
connectUsers alice bob
|
||||
-- just for testing db query
|
||||
alice ##> "/_call get 1"
|
||||
alice ##> "/_call get"
|
||||
-- alice invite bob to call
|
||||
alice ##> ("/_call invite @2 " <> serialize testCallType)
|
||||
alice <## "ok"
|
||||
|
||||
Reference in New Issue
Block a user