mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-29 16:04:08 +00:00
add addRcvQueue and tests
This commit is contained in:
@@ -100,7 +100,7 @@ class Monad m => MonadAgentStore s m where
|
||||
getConn :: s -> ConnAlias -> m (Either StoreError SomeConn)
|
||||
deleteConn :: s -> ConnAlias -> m (Either StoreError ())
|
||||
addSndQueue :: s -> ConnAlias -> SendQueue -> m (Either StoreError ())
|
||||
addRcvQueue :: s -> ConnAlias -> SendQueue -> m (Either StoreError ())
|
||||
addRcvQueue :: s -> ConnAlias -> ReceiveQueue -> m (Either StoreError ())
|
||||
removeSndAuth :: s -> ConnAlias -> m (Either StoreError ())
|
||||
updateQueueStatus :: s -> ConnAlias -> QueueDirection -> QueueStatus -> m (Either StoreError ())
|
||||
createMsg :: s -> ConnAlias -> QueueDirection -> AMessage -> m (Either StoreError MessageDelivery)
|
||||
|
||||
@@ -96,8 +96,8 @@ insertWithLock st tableLock queryStr q = do
|
||||
DB.execute c queryStr q
|
||||
DB.lastInsertRowId c
|
||||
|
||||
updateWithLock :: (MonadUnliftIO m, ToRow q) => SQLiteStore -> (SQLiteStore -> TMVar ()) -> DB.Query -> q -> m ()
|
||||
updateWithLock st tableLock queryStr q = do
|
||||
executeWithLock :: (MonadUnliftIO m, ToRow q) => SQLiteStore -> (SQLiteStore -> TMVar ()) -> DB.Query -> q -> m ()
|
||||
executeWithLock st tableLock queryStr q = do
|
||||
withLock st tableLock $ \c -> liftIO $ do
|
||||
DB.execute c queryStr q
|
||||
|
||||
@@ -210,7 +210,7 @@ insertRcvConnection store connAlias rcvQueueId =
|
||||
|
||||
updateRcvConnectionWithSndQueue :: MonadUnliftIO m => SQLiteStore -> ConnAlias -> QueueRowId -> m ()
|
||||
updateRcvConnectionWithSndQueue store connAlias sndQueueId =
|
||||
updateWithLock
|
||||
executeWithLock
|
||||
store
|
||||
connectionsLock
|
||||
[s|
|
||||
@@ -247,6 +247,18 @@ insertSndConnection store connAlias sndQueueId =
|
||||
"INSERT INTO connections (conn_alias, receive_queue_id, send_queue_id) VALUES (?,NULL,?);"
|
||||
(Only connAlias :. Only sndQueueId)
|
||||
|
||||
updateSndConnectionWithRcvQueue :: MonadUnliftIO m => SQLiteStore -> ConnAlias -> QueueRowId -> m ()
|
||||
updateSndConnectionWithRcvQueue store connAlias rcvQueueId =
|
||||
executeWithLock
|
||||
store
|
||||
connectionsLock
|
||||
[s|
|
||||
UPDATE connections
|
||||
SET receive_queue_id = ?
|
||||
WHERE conn_alias = ?;
|
||||
|]
|
||||
(Only rcvQueueId :. Only connAlias)
|
||||
|
||||
getConnection :: MonadUnliftIO m => SQLiteStore -> ConnAlias -> m (Either StoreError (Maybe QueueRowId, Maybe QueueRowId))
|
||||
getConnection SQLiteStore {conn} connAlias = liftIO $ do
|
||||
r <-
|
||||
@@ -316,3 +328,22 @@ instance MonadUnliftIO m => MonadAgentStore SQLiteStore m where
|
||||
updateConn servId =
|
||||
insertSndQueue st servId sndQueue
|
||||
>>= updateRcvConnectionWithSndQueue st connAlias
|
||||
|
||||
-- TODO make transactional
|
||||
addRcvQueue :: SQLiteStore -> ConnAlias -> ReceiveQueue -> m (Either StoreError ())
|
||||
addRcvQueue st connAlias rcvQueue =
|
||||
getConn st connAlias
|
||||
>>= either (return . Left) checkUpdateConn
|
||||
where
|
||||
checkUpdateConn :: SomeConn -> m (Either StoreError ())
|
||||
checkUpdateConn = \case
|
||||
SomeConn SCDuplex _ -> return $ Left (SEBadConnType CDuplex)
|
||||
SomeConn SCReceive _ -> return $ Left (SEBadConnType CReceive)
|
||||
SomeConn SCSend _ ->
|
||||
upsertServer st (server (rcvQueue :: ReceiveQueue))
|
||||
>>= either (return . Left) (fmap Right . updateConn)
|
||||
|
||||
updateConn :: SMPServerId -> m ()
|
||||
updateConn servId =
|
||||
insertRcvQueue st servId rcvQueue
|
||||
>>= updateSndConnectionWithRcvQueue st connAlias
|
||||
|
||||
@@ -19,7 +19,6 @@ servers =
|
||||
)
|
||||
|]
|
||||
|
||||
-- TODO unique constraints on (server_id, rcv_id) and (server_id, snd_id)
|
||||
receiveQueues :: Query
|
||||
receiveQueues =
|
||||
[s|
|
||||
@@ -50,7 +49,8 @@ sendQueues =
|
||||
encrypt_key BLOB NOT NULL,
|
||||
sign_key BLOB NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
ack_mode INTEGER NOT NULL
|
||||
ack_mode INTEGER NOT NULL,
|
||||
UNIQUE (server_id, snd_id)
|
||||
)
|
||||
|]
|
||||
|
||||
|
||||
Reference in New Issue
Block a user