move message store to STM

This commit is contained in:
Evgeny Poberezkin
2020-10-21 10:32:29 +01:00
parent ca95a9fbfe
commit 0c17422fa1
2 changed files with 31 additions and 37 deletions
+14 -17
View File
@@ -6,7 +6,6 @@
module MsgStore.STM where
import Control.Monad.IO.Unlift
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import MsgStore
@@ -22,9 +21,9 @@ type STMMsgStore = TVar MsgStoreData
newMsgStore :: STM STMMsgStore
newMsgStore = newTVar $ MsgStoreData M.empty
instance MonadUnliftIO m => MonadMsgStore STMMsgStore MsgQueue m where
getMsgQueue :: STMMsgStore -> RecipientId -> m MsgQueue
getMsgQueue store rId = atomically $ do
instance MonadMsgStore STMMsgStore MsgQueue STM where
getMsgQueue :: STMMsgStore -> RecipientId -> STM MsgQueue
getMsgQueue store rId = do
m <- messages <$> readTVar store
maybe (newQ m) return $ M.lookup rId m
where
@@ -33,23 +32,21 @@ instance MonadUnliftIO m => MonadMsgStore STMMsgStore MsgQueue m where
writeTVar store . MsgStoreData $ M.insert rId q m'
return q
delMsgQueue :: STMMsgStore -> RecipientId -> m ()
delMsgQueue store rId = atomically . modifyTVar store $
delMsgQueue :: STMMsgStore -> RecipientId -> STM ()
delMsgQueue store rId = modifyTVar store $
\(MsgStoreData ms) ->
MsgStoreData $ M.delete rId ms
instance MonadUnliftIO m => MonadMsgQueue MsgQueue m where
writeMsg :: MsgQueue -> Message -> m ()
writeMsg (MsgQueue q) msg = atomically $ writeTQueue q msg
instance MonadMsgQueue MsgQueue STM where
writeMsg :: MsgQueue -> Message -> STM ()
writeMsg (MsgQueue q) = writeTQueue q
tryPeekMsg :: MsgQueue -> m (Maybe Message)
tryPeekMsg = atomically . tryPeekTQueue . msgQueue
tryPeekMsg :: MsgQueue -> STM (Maybe Message)
tryPeekMsg = tryPeekTQueue . msgQueue
peekMsg :: MsgQueue -> m Message
peekMsg = atomically . peekTQueue . msgQueue
peekMsg :: MsgQueue -> STM Message
peekMsg = peekTQueue . msgQueue
-- atomic delete (== read) last and peek next message if available
tryDelPeekMsg :: MsgQueue -> m (Maybe Message)
tryDelPeekMsg (MsgQueue q) =
atomically $
tryReadTQueue q >> tryPeekTQueue q
tryDelPeekMsg :: MsgQueue -> STM (Maybe Message)
tryDelPeekMsg (MsgQueue q) = tryReadTQueue q >> tryPeekTQueue q
+17 -20
View File
@@ -200,20 +200,18 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
storeMessage c = case status c of
ConnActive -> do
ms <- asks msgStore
q <- getMsgQueue ms (recipientId c)
mkMessage >>= writeMsg q
q <- atomically $ getMsgQueue ms (recipientId c)
mkMessage >>= atomically . writeMsg q
return OK
ConnOff -> return $ ERR AUTH
deliverMessage :: (MsgQueue -> m (Maybe Message)) -> RecipientId -> m Signed
deliverMessage :: (MsgQueue -> STM (Maybe Message)) -> RecipientId -> m Signed
deliverMessage tryPeek rId = do
ms <- asks msgStore
q <- getMsgQueue ms rId
tryPeek q >>= \case
q <- atomically $ getMsgQueue ms rId
atomically (tryPeek q) >>= \case
Just msg -> do
atomically $ do
sub <- M.lookup rId <$> readTVar subscriptions
forM_ sub $ \Sub {delivered} -> tryPutTMVar delivered ()
atomically setDelivered
return $ msgResponse rId msg
Nothing -> forkSub q >> return ok
where
@@ -229,21 +227,20 @@ client clnt@Client {subscriptions, rcvQ, sndQ} Server {subscribedQ} =
s -> s
_ -> return ()
subscriber :: MsgQueue -> m ()
subscriber q = atomically $ do
msg <- peekMsg q
writeTBQueue sndQ $ msgResponse rId msg
setSub (\s -> s {subThread = NoSub})
setDelivered
setSub :: (Sub -> Sub) -> STM ()
setSub f = modifyTVar subscriptions $ M.adjust f rId
subscriber :: MsgQueue -> m ()
subscriber q = do
msg <- peekMsg q
atomically $ do
writeTBQueue sndQ $ msgResponse rId msg
-- setSub (\s -> s {subThread = NoSub})
cs <- readTVar subscriptions
let sub = M.lookup rId cs
forM_ sub $ \s@Sub {delivered} -> do
void $ tryPutTMVar delivered ()
let cs' = M.insert rId s {subThread = NoSub} cs
writeTVar subscriptions cs'
setDelivered :: STM ()
setDelivered =
readTVar subscriptions
>>= mapM_ (\s -> tryPutTMVar (delivered s) ()) . M.lookup rId
mkSigned :: ConnId -> Command 'Broker -> Signed
mkSigned cId command = (cId, Cmd SBroker command)