Files
simplexmq/src/Simplex/Messaging/Agent/Lock.hs
T
Evgeny 0414ea59f0 smp server: journal message store (#1370)
* smp server: remove STM function from MsgStore

* polymorphic MsgStore

* jourmal storage for messages (WIP)

* more journal, test setup

* writeMsg

* test

* tryDelMsg

* delMsgQueue

* remove MsgStoreClass instance of existential wrapper for Msg stores

* store config

* extract common logic out of store instances

* add store type to config

* open journals, cache last message, tests pass

* CLI commands

* refactor import/export messages

* cli commands to import/export journal message store

* export journal without draining, import/export tests

* journal command

* import/export progress

* better progress info

* only log queue state once when importing

* logs

* handle IO errors in journal store, return as STORE error

* recover from state file errors

* fix message files after crash

* fix messages folder
2024-10-21 11:50:30 +01:00

59 lines
1.9 KiB
Haskell

module Simplex.Messaging.Agent.Lock
( Lock,
createLock,
createLockIO,
withLock,
withLock',
withGetLock,
withGetLocks,
)
where
import Control.Monad (void)
import Control.Monad.Except (ExceptT (..), runExceptT)
import Control.Monad.IO.Unlift
import Data.Functor (($>))
import Data.Set (Set)
import qualified Data.Set as S
import UnliftIO.Async (forConcurrently)
import qualified UnliftIO.Exception as E
import UnliftIO.STM
type Lock = TMVar String
createLock :: STM Lock
createLock = newEmptyTMVar
{-# INLINE createLock #-}
createLockIO :: IO Lock
createLockIO = newEmptyTMVarIO
{-# INLINE createLockIO #-}
withLock :: MonadUnliftIO m => Lock -> String -> ExceptT e m a -> ExceptT e m a
withLock lock name = ExceptT . withLock' lock name . runExceptT
{-# INLINE withLock #-}
withLock' :: MonadUnliftIO m => Lock -> String -> m a -> m a
withLock' lock name =
E.bracket_
(atomically $ putTMVar lock name)
(void . atomically $ takeTMVar lock)
withGetLock :: MonadUnliftIO m => (k -> STM Lock) -> k -> String -> m a -> m a
withGetLock getLock key name a =
E.bracket
(atomically $ getPutLock getLock key name)
(atomically . takeTMVar)
(const a)
withGetLocks :: MonadUnliftIO m => (k -> STM Lock) -> Set k -> String -> m a -> m a
withGetLocks getLock keys name = E.bracket holdLocks releaseLocks . const
where
holdLocks = forConcurrently (S.toList keys) $ \key -> atomically $ getPutLock getLock key name
releaseLocks = mapM_ (atomically . takeTMVar)
-- getLock and putTMVar can be in one transaction on the assumption that getLock doesn't write in case the lock already exists,
-- and in case it is created and added to some shared resource (we use TMap) it also helps avoid contention for the newly created lock.
getPutLock :: (k -> STM Lock) -> k -> String -> STM Lock
getPutLock getLock key name = getLock key >>= \l -> putTMVar l name $> l