core: http transport for remote session (#3178)

* Wire some of the session endpoints

* Start sending remote commands

* Expand remote controller

- Fix queues for pumping to remote
- Add 3-way test
- WIP: Add TTY wrapper for remote hosts
- Stop remote controller w/o ids to match starting

* Fix view events

* Drop notifications, add message test

* refactor, receive test

* hunt down stray asyncs

* Take discovery sockets in brackets

---------

Co-authored-by: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com>
This commit is contained in:
Alexander Bondarenko
2023-10-07 16:23:24 +03:00
committed by GitHub
parent 3ac342782b
commit 91561da351
14 changed files with 376 additions and 172 deletions
+83 -12
View File
@@ -72,6 +72,7 @@ import Simplex.Messaging.Protocol (AProtoServerWithAuth, AProtocolType (..), Cor
import Simplex.Messaging.TMap (TMap)
import Simplex.Messaging.Transport (simplexMQVersion)
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Transport.HTTP2.Client (HTTP2Client)
import Simplex.Messaging.Util (allFinally, catchAllErrors, liftEitherError, tryAllErrors, (<$$>))
import Simplex.Messaging.Version
import System.IO (Handle)
@@ -171,6 +172,7 @@ data ChatDatabase = ChatDatabase {chatStore :: SQLiteStore, agentStore :: SQLite
data ChatController = ChatController
{ currentUser :: TVar (Maybe User),
currentRemoteHost :: TVar (Maybe RemoteHostId),
activeTo :: TVar ActiveTo,
firstTime :: Bool,
smpAgent :: AgentClient,
@@ -424,6 +426,7 @@ data ChatCommand
| CreateRemoteHost -- ^ Configure a new remote host
| ListRemoteHosts
| StartRemoteHost RemoteHostId -- ^ Start and announce a remote host
-- | SwitchRemoteHost (Maybe RemoteHostId) -- ^ Switch current remote host
| StopRemoteHost RemoteHostId -- ^ Shut down a running session
| DeleteRemoteHost RemoteHostId -- ^ Unregister remote host and remove its data
| RegisterRemoteCtrl RemoteCtrlOOB -- ^ Register OOB data for satellite discovery and handshake
@@ -431,7 +434,7 @@ data ChatCommand
| ListRemoteCtrls
| AcceptRemoteCtrl RemoteCtrlId -- ^ Accept discovered data and store confirmation
| RejectRemoteCtrl RemoteCtrlId -- ^ Reject and blacklist discovered data
| StopRemoteCtrl RemoteCtrlId -- ^ Stop listening for announcements or terminate an active session
| StopRemoteCtrl -- ^ Stop listening for announcements or terminate an active session
| DeleteRemoteCtrl RemoteCtrlId -- ^ Remove all local data associated with a satellite session
| QuitChat
| ShowVersion
@@ -442,6 +445,29 @@ data ChatCommand
| GetAgentSubsDetails
deriving (Show)
allowRemoteCommand :: ChatCommand -> Bool -- XXX: consider using Relay/Block/ForceLocal
allowRemoteCommand = \case
StartChat {} -> False
APIStopChat -> False
APIActivateChat -> False
APISuspendChat {} -> False
SetTempFolder {} -> False
QuitChat -> False
CreateRemoteHost -> False
ListRemoteHosts -> False
StartRemoteHost {} -> False
-- SwitchRemoteHost {} -> False
StopRemoteHost {} -> False
DeleteRemoteHost {} -> False
RegisterRemoteCtrl {} -> False
StartRemoteCtrl -> False
ListRemoteCtrls -> False
AcceptRemoteCtrl {} -> False
RejectRemoteCtrl {} -> False
StopRemoteCtrl -> False
DeleteRemoteCtrl {} -> False
_ -> True
data ChatResponse
= CRActiveUser {user :: User}
| CRUsersList {users :: [UserInfo]}
@@ -619,7 +645,7 @@ data ChatResponse
| CRRemoteCtrlRejected {remoteCtrlId :: RemoteCtrlId}
| CRRemoteCtrlConnecting {remoteCtrlId :: RemoteCtrlId, displayName :: Text}
| CRRemoteCtrlConnected {remoteCtrlId :: RemoteCtrlId, displayName :: Text}
| CRRemoteCtrlStopped {remoteCtrlId :: RemoteCtrlId}
| CRRemoteCtrlStopped {_nullary :: Maybe Int}
| CRRemoteCtrlDeleted {remoteCtrlId :: RemoteCtrlId}
| CRSQLResult {rows :: [Text]}
| CRSlowSQLQueries {chatQueries :: [SlowSQLQuery], agentQueries :: [SlowSQLQuery]}
@@ -638,6 +664,27 @@ data ChatResponse
| CRTimedAction {action :: String, durationMilliseconds :: Int64}
deriving (Show)
allowRemoteEvent :: ChatResponse -> Bool
allowRemoteEvent = \case
CRRemoteHostCreated {} -> False
CRRemoteHostList {} -> False
CRRemoteHostStarted {} -> False
CRRemoteHostConnected {} -> False
CRRemoteHostStopped {} -> False
CRRemoteHostDeleted {} -> False
CRRemoteCtrlList {} -> False
CRRemoteCtrlRegistered {} -> False
CRRemoteCtrlStarted {} -> False
CRRemoteCtrlAnnounce {} -> False
CRRemoteCtrlFound {} -> False
CRRemoteCtrlAccepted {} -> False
CRRemoteCtrlRejected {} -> False
CRRemoteCtrlConnecting {} -> False
CRRemoteCtrlConnected {} -> False
CRRemoteCtrlStopped {} -> False
CRRemoteCtrlDeleted {} -> False
_ -> True
logResponseToFile :: ChatResponse -> Bool
logResponseToFile = \case
CRContactsDisconnected {} -> True
@@ -1107,6 +1154,27 @@ instance ToJSON ArchiveError where
toJSON = J.genericToJSON . sumTypeJSON $ dropPrefix "AE"
toEncoding = J.genericToEncoding . sumTypeJSON $ dropPrefix "AE"
data RemoteHostSession
= RemoteHostSessionStarting
{ announcer :: Async ()
}
| RemoteHostSessionStarted
{ -- | Path for local resources to be synchronized with host
storePath :: FilePath,
ctrlClient :: HTTP2Client
}
data RemoteCtrlSession = RemoteCtrlSession
{ -- | Server side of transport to process remote commands and forward notifications
discoverer :: Async (),
supervisor :: Async (),
hostServer :: Maybe (Async ()),
discovered :: TMap C.KeyHash TransportHost,
accepted :: TMVar RemoteCtrlId,
remoteOutputQ :: TBQueue ChatResponse,
remoteNotifyQ :: TBQueue Notification
}
type ChatMonad' m = (MonadUnliftIO m, MonadReader ChatController m)
type ChatMonad m = (ChatMonad' m, MonadError ChatError m)
@@ -1152,16 +1220,19 @@ unsetActive a = asks activeTo >>= atomically . (`modifyTVar` unset)
-- | Emit local events.
toView :: ChatMonad' m => ChatResponse -> m ()
toView = toView_ Nothing
-- | Used by transport to mark remote events with source.
toViewRemote :: ChatMonad' m => RemoteHostId -> ChatResponse -> m ()
toViewRemote = toView_ . Just
toView_ :: ChatMonad' m => Maybe RemoteHostId -> ChatResponse -> m ()
toView_ rh event = do
q <- asks outputQ
atomically $ writeTBQueue q (Nothing, rh, event)
toView event = do
localQ <- asks outputQ
chatReadVar remoteCtrlSession >>= \case
Nothing -> atomically $ writeTBQueue localQ (Nothing, Nothing, event)
Just RemoteCtrlSession {remoteOutputQ} ->
if allowRemoteEvent event
then do
-- TODO: filter events or let the UI ignore trigger events by itself?
-- traceM $ "Sending event to remote Q: " <> show event
atomically $ writeTBQueue remoteOutputQ event -- TODO: check full?
else do
-- traceM $ "Sending event to local Q: " <> show event
atomically $ writeTBQueue localQ (Nothing, Nothing, event)
withStore' :: ChatMonad m => (DB.Connection -> IO a) -> m a
withStore' action = withStore $ liftIO . action
+2 -2
View File
@@ -43,7 +43,7 @@ import Simplex.Messaging.Agent.Protocol (AgentMsgId, MsgMeta (..), MsgReceiptSta
import Simplex.Messaging.Crypto.File (CryptoFile (..))
import qualified Simplex.Messaging.Crypto.File as CF
import Simplex.Messaging.Encoding.String
import Simplex.Messaging.Parsers (dropPrefix, enumJSON, fromTextField_, parseAll, sumTypeJSON)
import Simplex.Messaging.Parsers (dropPrefix, enumJSON, fromTextField_, parseAll, enumJSON, sumTypeJSON)
import Simplex.Messaging.Protocol (MsgBody)
import Simplex.Messaging.Util (eitherToMaybe, safeDecodeUtf8, (<$?>))
@@ -880,7 +880,7 @@ data SndCIStatusProgress
deriving (Eq, Show, Generic)
instance FromJSON SndCIStatusProgress where
parseJSON = J.genericParseJSON . sumTypeJSON $ dropPrefix "SSP"
parseJSON = J.genericParseJSON . enumJSON $ dropPrefix "SSP"
instance ToJSON SndCIStatusProgress where
toJSON = J.genericToJSON . enumJSON $ dropPrefix "SSP"
+151 -68
View File
@@ -4,12 +4,15 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
module Simplex.Chat.Remote where
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Class
import Control.Monad.Reader (asks)
import Control.Monad.STM (retry)
import Crypto.Random (getRandomBytes)
import qualified Data.Aeson as J
@@ -18,9 +21,13 @@ import Data.ByteString (ByteString)
import qualified Data.ByteString.Base64.URL as B64U
import qualified Data.ByteString.Char8 as B
import Data.List.NonEmpty (NonEmpty (..))
import Data.Maybe (fromMaybe)
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import qualified Network.HTTP.Types as HTTP
import qualified Network.HTTP.Types.Status as Status
import qualified Network.HTTP2.Client as HTTP2Client
import qualified Network.HTTP2.Server as HTTP2Server
import Network.Socket (SockAddr (..), hostAddressToTuple)
import Simplex.Chat.Controller
import qualified Simplex.Chat.Remote.Discovery as Discovery
@@ -36,7 +43,7 @@ import Simplex.Messaging.Transport.HTTP2 (HTTP2Body (..))
import Simplex.Messaging.Transport.HTTP2.Client (HTTP2Client)
import qualified Simplex.Messaging.Transport.HTTP2.Client as HTTP2
import qualified Simplex.Messaging.Transport.HTTP2.Server as HTTP2
import Simplex.Messaging.Util (bshow)
import Simplex.Messaging.Util (bshow, ifM, tshow)
import System.Directory (getFileSize)
import UnliftIO
@@ -54,32 +61,67 @@ withRemoteHost remoteHostId action =
startRemoteHost :: (ChatMonad m) => RemoteHostId -> m ChatResponse
startRemoteHost remoteHostId = do
M.lookup remoteHostId <$> chatReadVar remoteHostSessions >>= \case
asks remoteHostSessions >>= atomically . TM.lookup remoteHostId >>= \case
Just _ -> throwError $ ChatErrorRemoteHost remoteHostId RHBusy
Nothing -> withRemoteHost remoteHostId run
where
run RemoteHost {storePath, caKey, caCert} = do
announcer <- async $ do
cleanup <- toIO $ closeRemoteHostSession remoteHostId >>= toView
let parent = (C.signatureKeyPair caKey, caCert)
sessionCreds <- liftIO $ genCredentials (Just parent) (0, 24) "Session"
let (fingerprint, credentials) = tlsCredentials $ sessionCreds :| [parent]
Discovery.announceRevHTTP2 cleanup fingerprint credentials >>= \case
Left todo'err -> liftIO cleanup -- TODO: log error
Right ctrlClient -> do
chatModifyVar remoteHostSessions $ M.insert remoteHostId RemoteHostSessionStarted {storePath, ctrlClient}
-- TODO: start streaming outputQ
toView CRRemoteHostConnected {remoteHostId}
Nothing -> withRemoteHost remoteHostId $ \rh -> do
announcer <- async $ run rh
chatModifyVar remoteHostSessions $ M.insert remoteHostId RemoteHostSessionStarting {announcer}
pure CRRemoteHostStarted {remoteHostId}
where
cleanup finished = do
logInfo "Remote host http2 client fininshed"
atomically $ writeTVar finished True
closeRemoteHostSession remoteHostId >>= toView
run RemoteHost {storePath, caKey, caCert} = do
finished <- newTVarIO False
let parent = (C.signatureKeyPair caKey, caCert)
sessionCreds <- liftIO $ genCredentials (Just parent) (0, 24) "Session"
let (fingerprint, credentials) = tlsCredentials $ sessionCreds :| [parent]
Discovery.announceRevHTTP2 (cleanup finished) fingerprint credentials >>= \case
Left h2ce -> do
logError $ "Failed to set up remote host connection: " <> tshow h2ce
cleanup finished
Right ctrlClient -> do
chatModifyVar remoteHostSessions $ M.insert remoteHostId RemoteHostSessionStarted {storePath, ctrlClient}
chatWriteVar currentRemoteHost $ Just remoteHostId
sendHello ctrlClient >>= \case
Left h2ce -> do
logError $ "Failed to send initial remote host request: " <> tshow h2ce
cleanup finished
Right HTTP2.HTTP2Response {respBody = HTTP2Body {bodyHead}} -> do
logDebug $ "Got initial from remote host: " <> tshow bodyHead
_ <- asks outputQ >>= async . pollRemote finished ctrlClient "/recv" (Nothing, Just remoteHostId,)
toView CRRemoteHostConnected {remoteHostId}
sendHello :: (ChatMonad m) => HTTP2Client -> m (Either HTTP2.HTTP2ClientError HTTP2.HTTP2Response)
sendHello http = liftIO (HTTP2.sendRequestDirect http req Nothing)
where
req = HTTP2Client.requestNoBody "GET" "/" mempty
pollRemote :: (ChatMonad m, J.FromJSON a) => TVar Bool -> HTTP2Client -> ByteString -> (a -> b) -> TBQueue b -> m ()
pollRemote finished http path f queue = loop
where
loop = do
liftIO (HTTP2.sendRequestDirect http req Nothing) >>= \case
Left e -> logError $ "pollRemote: " <> tshow (path, e)
Right HTTP2.HTTP2Response {respBody = HTTP2Body {bodyHead}} ->
case J.eitherDecodeStrict' bodyHead of
Left e -> logError $ "pollRemote/decode: " <> tshow (path, e)
Right o -> atomically $ writeTBQueue queue (f o)
readTVarIO finished >>= (`unless` loop)
req = HTTP2Client.requestNoBody "GET" path mempty
closeRemoteHostSession :: (ChatMonad m) => RemoteHostId -> m ChatResponse
closeRemoteHostSession remoteHostId = withRemoteHostSession remoteHostId $ \session -> do
case session of
RemoteHostSessionStarting {announcer} -> cancel announcer
RemoteHostSessionStarted {ctrlClient} -> liftIO (HTTP2.closeHTTP2Client ctrlClient)
liftIO $ cancelRemoteHostSession session
chatWriteVar currentRemoteHost Nothing
chatModifyVar remoteHostSessions $ M.delete remoteHostId
pure CRRemoteHostStopped { remoteHostId }
pure CRRemoteHostStopped {remoteHostId}
cancelRemoteHostSession :: MonadUnliftIO m => RemoteHostSession -> m ()
cancelRemoteHostSession = \case
RemoteHostSessionStarting {announcer} -> cancel announcer
RemoteHostSessionStarted {ctrlClient} -> liftIO $ HTTP2.closeHTTP2Client ctrlClient
createRemoteHost :: (ChatMonad m) => m ChatResponse
createRemoteHost = do
@@ -87,10 +129,7 @@ createRemoteHost = do
((_, caKey), caCert) <- liftIO $ genCredentials Nothing (-25, 24 * 365) displayName
storePath <- liftIO randomStorePath
remoteHostId <- withStore' $ \db -> insertRemoteHost db storePath displayName caKey caCert
let oobData =
RemoteCtrlOOB
{ caFingerprint = C.certificateFingerprint caCert
}
let oobData = RemoteCtrlOOB {caFingerprint = C.certificateFingerprint caCert}
pure CRRemoteHostCreated {remoteHostId, oobData}
-- | Generate a random 16-char filepath without / in it by using base64url encoding.
@@ -113,41 +152,40 @@ deleteRemoteHost remoteHostId = withRemoteHost remoteHostId $ \rh -> do
pure CRRemoteHostDeleted {remoteHostId}
processRemoteCommand :: (ChatMonad m) => RemoteHostSession -> (ByteString, ChatCommand) -> m ChatResponse
processRemoteCommand RemoteHostSessionStarting {} _ = error "TODO: sending remote commands before session started"
processRemoteCommand RemoteHostSessionStarted {ctrlClient} (s, cmd) =
processRemoteCommand RemoteHostSessionStarting {} _ = pure . CRChatError Nothing . ChatError $ CEInternalError "sending remote commands before session started"
processRemoteCommand RemoteHostSessionStarted {ctrlClient} (s, cmd) = do
logDebug $ "processRemoteCommand: " <> T.pack (show s)
-- XXX: intercept and filter some commands
-- TODO: store missing files on remote host
relayCommand ctrlClient s
relayCommand :: (ChatMonad m) => HTTP2Client -> ByteString -> m ChatResponse
relayCommand http s =
postBytestring Nothing http "/relay" mempty s >>= \case
Left e -> error "TODO: http2chatError"
postBytestring Nothing http "/send" mempty s >>= \case
Left e -> err $ "relayCommand/post: " <> show e
Right HTTP2.HTTP2Response {respBody = HTTP2Body {bodyHead}} -> do
remoteChatResponse <-
if iTax
then case J.eitherDecodeStrict bodyHead of -- XXX: large JSONs can overflow into buffered chunks
Left e -> error "TODO: json2chatError" e
Right (raw :: J.Value) -> case J.fromJSON (sum2tagged raw) of
J.Error e -> error "TODO: json2chatError" e
J.Success cr -> pure cr
else case J.eitherDecodeStrict bodyHead of -- XXX: large JSONs can overflow into buffered chunks
Left e -> error "TODO: json2chatError" e
Right cr -> pure cr
logDebug $ "Got /send response: " <> T.pack (show bodyHead)
remoteChatResponse <- case J.eitherDecodeStrict bodyHead of -- XXX: large JSONs can overflow into buffered chunks
Left e -> err $ "relayCommand/decodeValue: " <> show e
Right json -> case J.fromJSON $ toTaggedJSON json of
J.Error e -> err $ "relayCommand/fromJSON: " <> show e
J.Success cr -> pure cr
case remoteChatResponse of
-- TODO: intercept file responses and fetch files when needed
-- XXX: is that even possible, to have a file response to a command?
_ -> pure remoteChatResponse
where
iTax = True -- TODO: get from RemoteHost
err = pure . CRChatError Nothing . ChatError . CEInternalError
toTaggedJSON :: J.Value -> J.Value
toTaggedJSON = id -- owsf2tagged TODO: get from RemoteHost
-- XXX: extract to http2 transport
postBytestring timeout c path hs body = liftIO $ HTTP2.sendRequest c req timeout
postBytestring timeout' c path hs body = liftIO $ HTTP2.sendRequestDirect c req timeout'
where
req = HTTP2Client.requestBuilder "POST" path hs (Binary.fromByteString body)
-- | Convert swift single-field sum encoding into tagged/discriminator-field
sum2tagged :: J.Value -> J.Value
sum2tagged = \case
owsf2tagged :: J.Value -> J.Value
owsf2tagged = \case
J.Object todo'convert -> J.Object todo'convert
skip -> skip
@@ -161,13 +199,13 @@ storeRemoteFile http localFile = do
where
postFile timeout c path hs file = liftIO $ do
fileSize <- fromIntegral <$> getFileSize file
HTTP2.sendRequest c (req fileSize) timeout
HTTP2.sendRequestDirect c (req fileSize) timeout
where
req size = HTTP2Client.requestFile "POST" path hs (HTTP2Client.FileSpec file 0 size)
req size = HTTP2Client.requestFile "PUT" path hs (HTTP2Client.FileSpec file 0 size)
fetchRemoteFile :: (ChatMonad m) => HTTP2Client -> FilePath -> FileTransferId -> m ChatResponse
fetchRemoteFile http storePath remoteFileId = do
liftIO (HTTP2.sendRequest http req Nothing) >>= \case
liftIO (HTTP2.sendRequestDirect http req Nothing) >>= \case
Left e -> error "TODO: http2chatError"
Right HTTP2.HTTP2Response {respBody} -> do
error "TODO: stream body into a local file" -- XXX: consult headers for a file name?
@@ -175,47 +213,84 @@ fetchRemoteFile http storePath remoteFileId = do
req = HTTP2Client.requestNoBody "GET" path mempty
path = "/fetch/" <> bshow remoteFileId
processControllerRequest :: (ChatMonad m) => RemoteCtrlId -> HTTP2.HTTP2Request -> m ()
processControllerRequest rc req = error "TODO: processControllerRequest"
processControllerRequest :: forall m . (ChatMonad m) => (ByteString -> m ChatResponse) -> HTTP2.HTTP2Request -> m ()
processControllerRequest execChatCommand HTTP2.HTTP2Request {request, reqBody = HTTP2Body {bodyHead}, sendResponse} = do
logDebug $ "Remote controller request: " <> T.pack (show $ method <> " " <> path)
res <- tryChatError $ case (method, path) of
("GET", "/") -> getHello
("POST", "/send") -> sendCommand
("GET", "/recv") -> recvMessage
("PUT", "/store") -> storeFile
("GET", "/fetch") -> fetchFile
unexpected -> respondWith Status.badRequest400 $ "unexpected method/path: " <> Binary.putStringUtf8 (show unexpected)
case res of
Left e -> logError $ "Error handling remote controller request: (" <> tshow (method <> " " <> path) <> "): " <> tshow e
Right () -> logDebug $ "Remote controller request: " <> tshow (method <> " " <> path) <> " OK"
where
method = fromMaybe "" $ HTTP2Server.requestMethod request
path = fromMaybe "" $ HTTP2Server.requestPath request
getHello = respond "OK"
sendCommand = execChatCommand bodyHead >>= respondJSON
recvMessage = chatReadVar remoteCtrlSession >>= \case
Nothing -> respondWith Status.internalServerError500 "session not active"
Just rcs -> atomically (readTBQueue $ remoteOutputQ rcs) >>= respondJSON
storeFile = respondWith Status.notImplemented501 "TODO: storeFile"
fetchFile = respondWith Status.notImplemented501 "TODO: fetchFile"
respondJSON :: J.ToJSON a => a -> m ()
respondJSON = respond . Binary.fromLazyByteString . J.encode
respond = respondWith Status.ok200
respondWith status = liftIO . sendResponse . HTTP2Server.responseBuilder status []
-- * ChatRequest handlers
startRemoteCtrl :: (ChatMonad m) => m ChatResponse
startRemoteCtrl =
startRemoteCtrl :: (ChatMonad m) => (ByteString -> m ChatResponse) -> m ChatResponse
startRemoteCtrl execChatCommand =
chatReadVar remoteCtrlSession >>= \case
Just _busy -> throwError $ ChatErrorRemoteCtrl RCEBusy
Nothing -> do
accepted <- newEmptyTMVarIO
size <- asks $ tbqSize . config
remoteOutputQ <- newTBQueueIO size
remoteNotifyQ <- newTBQueueIO size
discovered <- newTVarIO mempty
discoverer <- async $ discoverRemoteCtrls discovered
accepted <- newEmptyTMVarIO
supervisor <- async $ do
remoteCtrlId <- atomically (readTMVar accepted)
withRemoteCtrl remoteCtrlId $ \RemoteCtrl {displayName, fingerprint} -> do
source <- atomically $ TM.lookup fingerprint discovered >>= maybe retry pure
toView $ CRRemoteCtrlConnecting {remoteCtrlId, displayName}
atomically $ writeTVar discovered mempty -- flush unused sources
server <- async $ Discovery.connectRevHTTP2 source fingerprint (processControllerRequest remoteCtrlId)
server <- async $ Discovery.connectRevHTTP2 source fingerprint (processControllerRequest execChatCommand)
chatModifyVar remoteCtrlSession $ fmap $ \s -> s {hostServer = Just server}
toView $ CRRemoteCtrlConnected {remoteCtrlId, displayName}
_ <- waitCatch server
chatWriteVar remoteCtrlSession Nothing
toView $ CRRemoteCtrlStopped {remoteCtrlId}
chatWriteVar remoteCtrlSession $ Just RemoteCtrlSession {discoverer, supervisor, hostServer = Nothing, discovered, accepted}
toView $ CRRemoteCtrlStopped Nothing
chatWriteVar remoteCtrlSession $ Just RemoteCtrlSession {discoverer, supervisor, hostServer = Nothing, discovered, accepted, remoteOutputQ, remoteNotifyQ}
pure $ CRRemoteCtrlStarted Nothing
discoverRemoteCtrls :: (ChatMonad m) => TM.TMap C.KeyHash TransportHost -> m ()
discoverRemoteCtrls discovered = Discovery.openListener >>= go
discoverRemoteCtrls discovered = Discovery.withListener go
where
go sock =
Discovery.recvAnnounce sock >>= \case
(SockAddrInet _port addr, invite) -> case strDecode invite of
(SockAddrInet _sockPort sockAddr, invite) -> case strDecode invite of
Left _ -> go sock -- ignore malformed datagrams
Right fingerprint -> do
atomically $ TM.insert fingerprint (THIPv4 $ hostAddressToTuple addr) discovered
let addr = THIPv4 (hostAddressToTuple sockAddr)
ifM
(atomically $ TM.member fingerprint discovered)
(logDebug $ "Fingerprint announce already knwon: " <> T.pack (show (addr, invite)))
(do
logInfo $ "New fingerprint announce: " <> T.pack (show (addr, invite))
atomically $ TM.insert fingerprint addr discovered
)
withStore' (`getRemoteCtrlByFingerprint` fingerprint) >>= \case
Nothing -> toView $ CRRemoteCtrlAnnounce fingerprint -- unknown controller, ui action required
Nothing -> toView $ CRRemoteCtrlAnnounce fingerprint -- unknown controller, ui "register" action required
Just found@RemoteCtrl {remoteCtrlId, accepted=storedChoice} -> case storedChoice of
Nothing -> toView $ CRRemoteCtrlFound found -- first-time controller, ui action required
Nothing -> toView $ CRRemoteCtrlFound found -- first-time controller, ui "accept" action required
Just False -> pure () -- skipping a rejected item
Just True -> chatReadVar remoteCtrlSession >>= \case
Nothing -> toView . CRChatError Nothing . ChatError $ CEInternalError "Remote host found without running a session"
@@ -258,20 +333,28 @@ rejectRemoteCtrl remoteCtrlId = do
cancel supervisor
pure $ CRRemoteCtrlRejected {remoteCtrlId}
stopRemoteCtrl :: (ChatMonad m) => RemoteCtrlId -> m ChatResponse
stopRemoteCtrl remoteCtrlId =
stopRemoteCtrl :: (ChatMonad m) => m ChatResponse
stopRemoteCtrl =
chatReadVar remoteCtrlSession >>= \case
Nothing -> throwError $ ChatErrorRemoteCtrl RCEInactive
Just RemoteCtrlSession {discoverer, supervisor, hostServer} -> do
cancel discoverer -- may be gone by now
case hostServer of
Just host -> cancel host -- supervisor will clean up
Nothing -> do
cancel supervisor -- supervisor is blocked until session progresses
chatWriteVar remoteCtrlSession Nothing
toView $ CRRemoteCtrlStopped {remoteCtrlId}
Just rcs -> do
cancelRemoteCtrlSession rcs $ do
chatWriteVar remoteCtrlSession Nothing
toView $ CRRemoteCtrlStopped Nothing
pure $ CRCmdOk Nothing
cancelRemoteCtrlSession_ :: MonadUnliftIO m => RemoteCtrlSession -> m ()
cancelRemoteCtrlSession_ rcs = cancelRemoteCtrlSession rcs $ pure ()
cancelRemoteCtrlSession :: MonadUnliftIO m => RemoteCtrlSession -> m () -> m ()
cancelRemoteCtrlSession RemoteCtrlSession {discoverer, supervisor, hostServer} cleanup = do
cancel discoverer -- may be gone by now
case hostServer of
Just host -> cancel host -- supervisor will clean up
Nothing -> do
cancel supervisor -- supervisor is blocked until session progresses
cleanup
deleteRemoteCtrl :: (ChatMonad m) => RemoteCtrlId -> m ChatResponse
deleteRemoteCtrl remoteCtrlId =
chatReadVar remoteCtrlSession >>= \case
+17 -9
View File
@@ -12,6 +12,7 @@ module Simplex.Chat.Remote.Discovery
-- * Discovery
connectRevHTTP2,
withListener,
openListener,
recvAnnounce,
connectTLSClient,
@@ -32,7 +33,7 @@ import Simplex.Messaging.Transport (supportedParameters)
import qualified Simplex.Messaging.Transport as Transport
import Simplex.Messaging.Transport.Client (TransportHost (..), defaultTransportClientConfig, runTransportClient)
import Simplex.Messaging.Transport.HTTP2 (defaultHTTP2BufferSize, getHTTP2Body)
import Simplex.Messaging.Transport.HTTP2.Client (HTTP2Client, HTTP2ClientError, attachHTTP2Client, defaultHTTP2ClientConfig)
import Simplex.Messaging.Transport.HTTP2.Client (HTTP2Client, HTTP2ClientError, attachHTTP2Client, connTimeout, defaultHTTP2ClientConfig)
import Simplex.Messaging.Transport.HTTP2.Server (HTTP2Request (..), runHTTP2ServerWith)
import Simplex.Messaging.Transport.Server (defaultTransportServerConfig, runTransportServer)
import Simplex.Messaging.Util (whenM)
@@ -52,15 +53,16 @@ pattern BROADCAST_PORT = "5226"
-- | Announce tls server, wait for connection and attach http2 client to it.
--
-- Announcer is started when TLS server is started and stopped when a connection is made.
announceRevHTTP2 :: (StrEncoding invite, MonadUnliftIO m) => IO () -> invite -> TLS.Credentials -> m (Either HTTP2ClientError HTTP2Client)
announceRevHTTP2 :: (StrEncoding invite, MonadUnliftIO m) => m () -> invite -> TLS.Credentials -> m (Either HTTP2ClientError HTTP2Client)
announceRevHTTP2 finishAction invite credentials = do
httpClient <- newEmptyMVar
started <- newEmptyTMVarIO
finished <- newEmptyMVar
announcer <- async . liftIO . whenM (atomically $ takeTMVar started) $ runAnnouncer (strEncode invite)
tlsServer <- startTLSServer started credentials $ \tls -> cancel announcer >> runHTTP2Client finished httpClient tls
_ <- forkIO . liftIO $ do
_ <- forkIO $ do
readMVar finished
cancel announcer
cancel tlsServer
finishAction
readMVar httpClient
@@ -68,11 +70,12 @@ announceRevHTTP2 finishAction invite credentials = do
-- | Broadcast invite with link-local datagrams
runAnnouncer :: ByteString -> IO ()
runAnnouncer inviteBS = do
sock <- UDP.clientSocket BROADCAST_ADDR_V4 BROADCAST_PORT False
N.setSocketOption (UDP.udpSocket sock) N.Broadcast 1
forever $ do
UDP.send sock inviteBS
threadDelay 1000000
bracket (UDP.clientSocket BROADCAST_ADDR_V4 BROADCAST_PORT False) UDP.close $ \sock -> do
N.setSocketOption (UDP.udpSocket sock) N.Broadcast 1
N.setSocketOption (UDP.udpSocket sock) N.ReuseAddr 1
forever $ do
UDP.send sock inviteBS
threadDelay 1000000
startTLSServer :: (MonadUnliftIO m) => TMVar Bool -> TLS.Credentials -> (Transport.TLS -> IO ()) -> m (Async ())
startTLSServer started credentials = async . liftIO . runTransportServer started BROADCAST_PORT serverParams defaultTransportServerConfig
@@ -88,8 +91,13 @@ startTLSServer started credentials = async . liftIO . runTransportServer started
-- | Attach HTTP2 client and hold the TLS until the attached client finishes.
runHTTP2Client :: MVar () -> MVar (Either HTTP2ClientError HTTP2Client) -> Transport.TLS -> IO ()
runHTTP2Client finishedVar clientVar tls = do
attachHTTP2Client defaultHTTP2ClientConfig ANY_ADDR_V4 BROADCAST_PORT (putMVar finishedVar ()) defaultHTTP2BufferSize tls >>= putMVar clientVar
attachHTTP2Client config ANY_ADDR_V4 BROADCAST_PORT (putMVar finishedVar ()) defaultHTTP2BufferSize tls >>= putMVar clientVar
readMVar finishedVar
where
config = defaultHTTP2ClientConfig { connTimeout = 86400000000 }
withListener :: (MonadUnliftIO m) => (UDP.ListenSocket -> m a) -> m a
withListener = bracket openListener (liftIO . UDP.stop)
openListener :: (MonadIO m) => m UDP.ListenSocket
openListener = liftIO $ do
-24
View File
@@ -5,15 +5,10 @@
module Simplex.Chat.Remote.Types where
import Control.Concurrent.Async (Async)
import qualified Data.Aeson.TH as J
import Data.Int (Int64)
import Data.Text (Text)
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.TMap (TMap)
import Simplex.Messaging.Transport.Client (TransportHost)
import Simplex.Messaging.Transport.HTTP2.Client (HTTP2Client)
import UnliftIO.STM
type RemoteHostId = Int64
@@ -40,22 +35,3 @@ data RemoteCtrl = RemoteCtrl
deriving (Show)
$(J.deriveJSON J.defaultOptions ''RemoteCtrl)
data RemoteHostSession
= RemoteHostSessionStarting
{ announcer :: Async ()
}
| RemoteHostSessionStarted
{ -- | Path for local resources to be synchronized with host
storePath :: FilePath,
ctrlClient :: HTTP2Client
}
data RemoteCtrlSession = RemoteCtrlSession
{ -- | Server side of transport to process remote commands and forward notifications
discoverer :: Async (),
supervisor :: Async (),
hostServer :: Maybe (Async ()),
discovered :: TMap C.KeyHash TransportHost,
accepted :: TMVar RemoteCtrlId
}
+6 -4
View File
@@ -53,15 +53,16 @@ getKey =
runInputLoop :: ChatTerminal -> ChatController -> IO ()
runInputLoop ct@ChatTerminal {termState, liveMessageState} cc = forever $ do
s <- atomically . readTBQueue $ inputQ cc
rh <- readTVarIO $ currentRemoteHost cc
let bs = encodeUtf8 $ T.pack s
cmd = parseChatCommand bs
unless (isMessage cmd) $ echo s
r <- runReaderT (execChatCommand Nothing bs) cc
r <- runReaderT (execChatCommand rh bs) cc
case r of
CRChatCmdError _ _ -> when (isMessage cmd) $ echo s
CRChatError _ _ -> when (isMessage cmd) $ echo s
_ -> pure ()
printRespToTerminal ct cc False r
printRespToTerminal ct cc False rh r
startLiveMessage cmd r
where
echo s = printToTerminal ct [plain s]
@@ -134,7 +135,7 @@ runTerminalInput ct cc = withChatTerm ct $ do
receiveFromTTY cc ct
receiveFromTTY :: forall m. MonadTerminal m => ChatController -> ChatTerminal -> m ()
receiveFromTTY cc@ChatController {inputQ, activeTo, currentUser, chatStore} ct@ChatTerminal {termSize, termState, liveMessageState} =
receiveFromTTY cc@ChatController {inputQ, activeTo, currentUser, currentRemoteHost, chatStore} ct@ChatTerminal {termSize, termState, liveMessageState} =
forever $ getKey >>= liftIO . processKey >> withTermLock ct (updateInput ct)
where
processKey :: (Key, Modifiers) -> IO ()
@@ -166,7 +167,8 @@ receiveFromTTY cc@ChatController {inputQ, activeTo, currentUser, chatStore} ct@C
kill promptThreadId
atomically $ writeTVar liveMessageState Nothing
r <- sendUpdatedLiveMessage cc sentMsg lm False
printRespToTerminal ct cc False r
rh <- readTVarIO currentRemoteHost -- XXX: should be inherited from live message state
printRespToTerminal ct cc False rh r
where
kill sel = deRefWeak (sel lm) >>= mapM_ killThread
+10 -8
View File
@@ -21,6 +21,7 @@ import Simplex.Chat.Controller
import Simplex.Chat.Messages hiding (NewChatItem (..))
import Simplex.Chat.Styled
import Simplex.Chat.View
import Simplex.Chat.Remote.Types (RemoteHostId)
import System.Console.ANSI.Types
import System.IO (IOMode (..), hPutStrLn, withFile)
import System.Mem.Weak (Weak)
@@ -112,7 +113,7 @@ withTermLock ChatTerminal {termLock} action = do
runTerminalOutput :: ChatTerminal -> ChatController -> IO ()
runTerminalOutput ct cc@ChatController {outputQ, showLiveItems, logFilePath} = do
forever $ do
(_, _, r) <- atomically $ readTBQueue outputQ
(_, outputRH, r) <- atomically $ readTBQueue outputQ
case r of
CRNewChatItem _ ci -> markChatItemRead ci
CRChatItemUpdated _ ci -> markChatItemRead ci
@@ -121,7 +122,7 @@ runTerminalOutput ct cc@ChatController {outputQ, showLiveItems, logFilePath} = d
Just path -> if logResponseToFile r then logResponse path else printToTerminal ct
_ -> printToTerminal ct
liveItems <- readTVarIO showLiveItems
responseString cc liveItems r >>= printResp
responseString cc liveItems outputRH r >>= printResp
where
markChatItemRead (AChatItem _ _ chat item@ChatItem {chatDir, meta = CIMeta {itemStatus}}) =
case (muted chat chatDir, itemStatus) of
@@ -132,15 +133,16 @@ runTerminalOutput ct cc@ChatController {outputQ, showLiveItems, logFilePath} = d
_ -> pure ()
logResponse path s = withFile path AppendMode $ \h -> mapM_ (hPutStrLn h . unStyle) s
printRespToTerminal :: ChatTerminal -> ChatController -> Bool -> ChatResponse -> IO ()
printRespToTerminal ct cc liveItems r = responseString cc liveItems r >>= printToTerminal ct
printRespToTerminal :: ChatTerminal -> ChatController -> Bool -> Maybe RemoteHostId -> ChatResponse -> IO ()
printRespToTerminal ct cc liveItems outputRH r = responseString cc liveItems outputRH r >>= printToTerminal ct
responseString :: ChatController -> Bool -> ChatResponse -> IO [StyledString]
responseString cc liveItems r = do
user <- readTVarIO $ currentUser cc
responseString :: ChatController -> Bool -> Maybe RemoteHostId -> ChatResponse -> IO [StyledString]
responseString cc liveItems outputRH r = do
currentRH <- readTVarIO $ currentRemoteHost cc
user <- readTVarIO $ currentUser cc -- XXX: local user, should be subsumed by remote when connected
ts <- getCurrentTime
tz <- getCurrentTimeZone
pure $ responseToView user (config cc) liveItems ts tz r
pure $ responseToView (currentRH, user) (config cc) liveItems ts tz outputRH r
printToTerminal :: ChatTerminal -> [StyledString] -> IO ()
printToTerminal ct s =
+1
View File
@@ -1442,6 +1442,7 @@ serializeIntroStatus = \case
GMIntroConnected -> "con"
data Notification = Notification {title :: Text, text :: Text}
deriving (Show, Generic, FromJSON, ToJSON)
textParseJSON :: TextEncoding a => String -> J.Value -> JT.Parser a
textParseJSON name = J.withText name $ maybe (fail $ "bad " <> name) pure . textDecode
+12 -10
View File
@@ -66,11 +66,11 @@ import System.Console.ANSI.Types
type CurrentTime = UTCTime
serializeChatResponse :: Maybe User -> CurrentTime -> TimeZone -> ChatResponse -> String
serializeChatResponse user_ ts tz = unlines . map unStyle . responseToView user_ defaultChatConfig False ts tz
serializeChatResponse :: (Maybe RemoteHostId, Maybe User) -> CurrentTime -> TimeZone -> Maybe RemoteHostId -> ChatResponse -> String
serializeChatResponse user_ ts tz remoteHost_ = unlines . map unStyle . responseToView user_ defaultChatConfig False ts tz remoteHost_
responseToView :: Maybe User -> ChatConfig -> Bool -> CurrentTime -> TimeZone -> ChatResponse -> [StyledString]
responseToView user_ ChatConfig {logLevel, showReactions, showReceipts, testView} liveItems ts tz = \case
responseToView :: (Maybe RemoteHostId, Maybe User) -> ChatConfig -> Bool -> CurrentTime -> TimeZone -> Maybe RemoteHostId -> ChatResponse -> [StyledString]
responseToView (currentRH, user_) ChatConfig {logLevel, showReactions, showReceipts, testView} liveItems ts tz outputRH = \case
CRActiveUser User {profile} -> viewUserProfile $ fromLocalProfile profile
CRUsersList users -> viewUsersList users
CRChatStarted _ -> ["chat started"]
@@ -274,7 +274,7 @@ responseToView user_ ChatConfig {logLevel, showReactions, showReceipts, testView
CRRemoteCtrlRejected rcId -> ["remote controller " <> sShow rcId <> " rejected"]
CRRemoteCtrlConnecting rcId rcName -> ["remote controller " <> sShow rcId <> " connecting to " <> plain rcName]
CRRemoteCtrlConnected rcId rcName -> ["remote controller " <> sShow rcId <> " connected, " <> plain rcName]
CRRemoteCtrlStopped rcId -> ["remote controller " <> sShow rcId <> " stopped"]
CRRemoteCtrlStopped _ -> ["remote controller stopped"]
CRRemoteCtrlDeleted rcId -> ["remote controller " <> sShow rcId <> " deleted"]
CRSQLResult rows -> map plain rows
CRSlowSQLQueries {chatQueries, agentQueries} ->
@@ -323,12 +323,14 @@ responseToView user_ ChatConfig {logLevel, showReactions, showReceipts, testView
| otherwise = []
ttyUserPrefix :: User -> [StyledString] -> [StyledString]
ttyUserPrefix _ [] = []
ttyUserPrefix User {userId, localDisplayName = u} ss = prependFirst userPrefix ss
ttyUserPrefix User {userId, localDisplayName = u} ss = prependFirst prefix ss
where
userPrefix = case user_ of
Just User {userId = activeUserId} -> if userId /= activeUserId then prefix else ""
_ -> prefix
prefix = "[user: " <> highlight u <> "] "
prefix = if outputRH /= currentRH then r else userPrefix
r = case outputRH of
Nothing -> "[local] " <> userPrefix
Just rh -> "[remote: ]" <> highlight (show rh) <> "] "
userPrefix = if Just userId /= currentUserId then "[user: " <> highlight u <> "] " else ""
currentUserId = fmap (\User {userId} -> userId) user_
ttyUser' :: Maybe User -> [StyledString] -> [StyledString]
ttyUser' = maybe id ttyUser
ttyUserPrefix' :: Maybe User -> [StyledString] -> [StyledString]