mirror of
https://github.com/simplex-chat/simplex-chat.git
synced 2026-04-25 20:42:13 +00:00
TMVar lock to avoid subscriber and client processing in parallel, fix the test (#90)
* TMVar lock to avoid subscriber and client processing in parallel, fix the test * run SMP server as part of the test * stabilize tests * update simplexmq * test: stabilize getting invitation from terminal * remove unused import * simplify test
This commit is contained in:
committed by
GitHub
parent
a9d32db404
commit
d23417596e
@@ -50,6 +50,7 @@ import System.Exit (exitFailure, exitSuccess)
|
||||
import System.IO (hFlush, stdout)
|
||||
import Text.Read (readMaybe)
|
||||
import UnliftIO.Async (race_)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
|
||||
data ChatCommand
|
||||
@@ -111,7 +112,8 @@ newChatController ChatConfig {agentConfig = cfg, dbPoolSize, tbqSize} ChatOpts {
|
||||
idsDrg <- newTVarIO =<< drgNew
|
||||
inputQ <- newTBQueueIO tbqSize
|
||||
notifyQ <- newTBQueueIO tbqSize
|
||||
pure ChatController {currentUser, smpAgent, chatTerminal, chatStore, idsDrg, inputQ, notifyQ, sendNotification}
|
||||
chatLock <- newTMVarIO ()
|
||||
pure ChatController {currentUser, smpAgent, chatTerminal, chatStore, idsDrg, inputQ, notifyQ, sendNotification, chatLock}
|
||||
|
||||
runSimplexChat :: ChatController -> IO ()
|
||||
runSimplexChat = runReaderT (race_ runTerminalInput runChatController)
|
||||
@@ -124,9 +126,16 @@ runChatController =
|
||||
notificationSubscriber
|
||||
]
|
||||
|
||||
withLock :: MonadUnliftIO m => TMVar () -> m () -> m ()
|
||||
withLock lock =
|
||||
E.bracket_
|
||||
(void . atomically $ takeTMVar lock)
|
||||
(atomically $ putTMVar lock ())
|
||||
|
||||
inputSubscriber :: (MonadUnliftIO m, MonadReader ChatController m) => m ()
|
||||
inputSubscriber = do
|
||||
q <- asks inputQ
|
||||
l <- asks chatLock
|
||||
forever $
|
||||
atomically (readTBQueue q) >>= \case
|
||||
InputControl _ -> pure ()
|
||||
@@ -139,7 +148,8 @@ inputSubscriber = do
|
||||
SendGroupMessage g msg -> showSentGroupMessage g msg
|
||||
_ -> printToView [plain s]
|
||||
user <- asks currentUser
|
||||
void . runExceptT $ processChatCommand user cmd `catchError` showChatError
|
||||
withLock l . void . runExceptT $
|
||||
processChatCommand user cmd `catchError` showChatError
|
||||
|
||||
processChatCommand :: ChatMonad m => User -> ChatCommand -> m ()
|
||||
processChatCommand user@User {userId, profile} = \case
|
||||
@@ -244,12 +254,14 @@ processChatCommand user@User {userId, profile} = \case
|
||||
agentSubscriber :: (MonadUnliftIO m, MonadReader ChatController m) => m ()
|
||||
agentSubscriber = do
|
||||
q <- asks $ subQ . smpAgent
|
||||
l <- asks chatLock
|
||||
subscribeUserConnections
|
||||
forever $ do
|
||||
(_, connId, msg) <- atomically $ readTBQueue q
|
||||
user <- asks currentUser
|
||||
-- TODO handle errors properly
|
||||
void . runExceptT $ processAgentMessage user connId msg `catchError` (liftIO . print)
|
||||
withLock l . void . runExceptT $
|
||||
processAgentMessage user connId msg `catchError` (liftIO . print)
|
||||
|
||||
subscribeUserConnections :: (MonadUnliftIO m, MonadReader ChatController m) => m ()
|
||||
subscribeUserConnections = void . runExceptT $ do
|
||||
|
||||
@@ -28,7 +28,8 @@ data ChatController = ChatController
|
||||
idsDrg :: TVar ChaChaDRG,
|
||||
inputQ :: TBQueue InputEvent,
|
||||
notifyQ :: TBQueue Notification,
|
||||
sendNotification :: Notification -> IO ()
|
||||
sendNotification :: Notification -> IO (),
|
||||
chatLock :: TMVar ()
|
||||
}
|
||||
|
||||
data InputEvent = InputCommand String | InputControl Char
|
||||
|
||||
Reference in New Issue
Block a user