From 0c17422fa14c1638ca49b78bfa3570a7e78fbe9d Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Wed, 21 Oct 2020 10:32:29 +0100 Subject: [PATCH] move message store to STM --- src/MsgStore/STM.hs | 31 ++++++++++++++----------------- src/Server.hs | 37 +++++++++++++++++-------------------- 2 files changed, 31 insertions(+), 37 deletions(-) diff --git a/src/MsgStore/STM.hs b/src/MsgStore/STM.hs index 0c3f093cb..392b7e686 100644 --- a/src/MsgStore/STM.hs +++ b/src/MsgStore/STM.hs @@ -6,7 +6,6 @@ module MsgStore.STM where -import Control.Monad.IO.Unlift import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import MsgStore @@ -22,9 +21,9 @@ type STMMsgStore = TVar MsgStoreData newMsgStore :: STM STMMsgStore newMsgStore = newTVar $ MsgStoreData M.empty -instance MonadUnliftIO m => MonadMsgStore STMMsgStore MsgQueue m where - getMsgQueue :: STMMsgStore -> RecipientId -> m MsgQueue - getMsgQueue store rId = atomically $ do +instance MonadMsgStore STMMsgStore MsgQueue STM where + getMsgQueue :: STMMsgStore -> RecipientId -> STM MsgQueue + getMsgQueue store rId = do m <- messages <$> readTVar store maybe (newQ m) return $ M.lookup rId m where @@ -33,23 +32,21 @@ instance MonadUnliftIO m => MonadMsgStore STMMsgStore MsgQueue m where writeTVar store . MsgStoreData $ M.insert rId q m' return q - delMsgQueue :: STMMsgStore -> RecipientId -> m () - delMsgQueue store rId = atomically . modifyTVar store $ + delMsgQueue :: STMMsgStore -> RecipientId -> STM () + delMsgQueue store rId = modifyTVar store $ \(MsgStoreData ms) -> MsgStoreData $ M.delete rId ms -instance MonadUnliftIO m => MonadMsgQueue MsgQueue m where - writeMsg :: MsgQueue -> Message -> m () - writeMsg (MsgQueue q) msg = atomically $ writeTQueue q msg +instance MonadMsgQueue MsgQueue STM where + writeMsg :: MsgQueue -> Message -> STM () + writeMsg (MsgQueue q) = writeTQueue q - tryPeekMsg :: MsgQueue -> m (Maybe Message) - tryPeekMsg = atomically . tryPeekTQueue . msgQueue + tryPeekMsg :: MsgQueue -> STM (Maybe Message) + tryPeekMsg = tryPeekTQueue . msgQueue - peekMsg :: MsgQueue -> m Message - peekMsg = atomically . peekTQueue . msgQueue + peekMsg :: MsgQueue -> STM Message + peekMsg = peekTQueue . msgQueue -- atomic delete (== read) last and peek next message if available - tryDelPeekMsg :: MsgQueue -> m (Maybe Message) - tryDelPeekMsg (MsgQueue q) = - atomically $ - tryReadTQueue q >> tryPeekTQueue q + tryDelPeekMsg :: MsgQueue -> STM (Maybe Message) + tryDelPeekMsg (MsgQueue q) = tryReadTQueue q >> tryPeekTQueue q diff --git a/src/Server.hs b/src/Server.hs index 862ca6dd4..0642ed391 100644 --- a/src/Server.hs +++ b/src/Server.hs @@ -200,20 +200,18 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = storeMessage c = case status c of ConnActive -> do ms <- asks msgStore - q <- getMsgQueue ms (recipientId c) - mkMessage >>= writeMsg q + q <- atomically $ getMsgQueue ms (recipientId c) + mkMessage >>= atomically . writeMsg q return OK ConnOff -> return $ ERR AUTH - deliverMessage :: (MsgQueue -> m (Maybe Message)) -> RecipientId -> m Signed + deliverMessage :: (MsgQueue -> STM (Maybe Message)) -> RecipientId -> m Signed deliverMessage tryPeek rId = do ms <- asks msgStore - q <- getMsgQueue ms rId - tryPeek q >>= \case + q <- atomically $ getMsgQueue ms rId + atomically (tryPeek q) >>= \case Just msg -> do - atomically $ do - sub <- M.lookup rId <$> readTVar subscriptions - forM_ sub $ \Sub {delivered} -> tryPutTMVar delivered () + atomically setDelivered return $ msgResponse rId msg Nothing -> forkSub q >> return ok where @@ -229,21 +227,20 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} = s -> s _ -> return () + subscriber :: MsgQueue -> m () + subscriber q = atomically $ do + msg <- peekMsg q + writeTBQueue sndQ $ msgResponse rId msg + setSub (\s -> s {subThread = NoSub}) + setDelivered + setSub :: (Sub -> Sub) -> STM () setSub f = modifyTVar subscriptions $ M.adjust f rId - subscriber :: MsgQueue -> m () - subscriber q = do - msg <- peekMsg q - atomically $ do - writeTBQueue sndQ $ msgResponse rId msg - -- setSub (\s -> s {subThread = NoSub}) - cs <- readTVar subscriptions - let sub = M.lookup rId cs - forM_ sub $ \s@Sub {delivered} -> do - void $ tryPutTMVar delivered () - let cs' = M.insert rId s {subThread = NoSub} cs - writeTVar subscriptions cs' + setDelivered :: STM () + setDelivered = + readTVar subscriptions + >>= mapM_ (\s -> tryPutTMVar (delivered s) ()) . M.lookup rId mkSigned :: ConnId -> Command 'Broker -> Signed mkSigned cId command = (cId, Cmd SBroker command)