mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 20:44:49 +00:00
deduplicate connections for locking
This commit is contained in:
@@ -130,6 +130,7 @@ import qualified Data.List.NonEmpty as L
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, mapMaybe)
|
||||
import qualified Data.Set as S
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Time.Clock
|
||||
@@ -898,7 +899,7 @@ sendMessagesB c reqs = withConnLocks c connIds "sendMessages" $ do
|
||||
prepareMsgs cData sqs
|
||||
| ratchetSyncSendProhibited cData = Left $ CMD PROHIBITED
|
||||
| otherwise = Right (cData, sqs, msgFlags, A_MSG msg)
|
||||
connIds = map (\(connId, _, _) -> connId) $ rights $ toList reqs
|
||||
connIds = foldl' (\cs -> either (\_ -> cs) (\(connId, _, _) -> S.insert connId cs)) S.empty reqs
|
||||
|
||||
-- / async command processing v v v
|
||||
|
||||
|
||||
@@ -661,13 +661,13 @@ withConnLock AgentClient {connLocks} connId name = withLockMap_ connLocks connId
|
||||
withInvLock :: MonadUnliftIO m => AgentClient -> ByteString -> String -> m a -> m a
|
||||
withInvLock AgentClient {invLocks} = withLockMap_ invLocks
|
||||
|
||||
withConnLocks :: MonadUnliftIO m => AgentClient -> [ConnId] -> String -> m a -> m a
|
||||
withConnLocks AgentClient {connLocks} = withLocksMap_ connLocks . filter (not . B.null)
|
||||
withConnLocks :: MonadUnliftIO m => AgentClient -> Set ConnId -> String -> m a -> m a
|
||||
withConnLocks AgentClient {connLocks} = withLocksMap_ connLocks . S.filter (not . B.null)
|
||||
|
||||
withLockMap_ :: (Ord k, MonadUnliftIO m) => TMap k Lock -> k -> String -> m a -> m a
|
||||
withLockMap_ = withGetLock . getMapLock
|
||||
|
||||
withLocksMap_ :: (Ord k, MonadUnliftIO m) => TMap k Lock -> [k] -> String -> m a -> m a
|
||||
withLocksMap_ :: (Ord k, MonadUnliftIO m) => TMap k Lock -> Set k -> String -> m a -> m a
|
||||
withLocksMap_ = withGetLocks . getMapLock
|
||||
|
||||
getMapLock :: Ord k => TMap k Lock -> k -> STM Lock
|
||||
|
||||
@@ -12,6 +12,8 @@ where
|
||||
import Control.Monad (void)
|
||||
import Control.Monad.IO.Unlift
|
||||
import Data.Functor (($>))
|
||||
import Data.Set (Set)
|
||||
import qualified Data.Set as S
|
||||
import UnliftIO.Async (forConcurrently)
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
@@ -35,10 +37,10 @@ withGetLock getLock key name a =
|
||||
(atomically . takeTMVar)
|
||||
(const a)
|
||||
|
||||
withGetLocks :: MonadUnliftIO m => (k -> STM Lock) -> [k] -> String -> m a -> m a
|
||||
withGetLocks :: MonadUnliftIO m => (k -> STM Lock) -> Set k -> String -> m a -> m a
|
||||
withGetLocks getLock keys name = E.bracket holdLocks releaseLocks . const
|
||||
where
|
||||
holdLocks = forConcurrently keys $ \key -> atomically $ getPutLock getLock key name
|
||||
holdLocks = forConcurrently (S.toList keys) $ \key -> atomically $ getPutLock getLock key name
|
||||
-- only this withGetLocks would be holding the locks,
|
||||
-- so it's safe to combine all lock releases into one transaction
|
||||
releaseLocks = atomically . mapM_ takeTMVar
|
||||
|
||||
Reference in New Issue
Block a user