mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-04-27 19:25:13 +00:00
agent store: add update queue methods (#19)
This commit is contained in:
@@ -212,12 +212,12 @@ processSMPTransmission c@AgentClient {sndQ} (srv, rId, cmd) = do
|
||||
print cmd
|
||||
where
|
||||
secureQueue :: ReceiveQueue -> SMP.SenderKey -> m ()
|
||||
secureQueue ReceiveQueue {rcvId, rcvPrivateKey} senderKey = do
|
||||
withStore $ \st -> updateReceiveQueueStatus st rcvId Confirmed
|
||||
secureQueue rq@ReceiveQueue {rcvPrivateKey} senderKey = do
|
||||
withStore $ \st -> updateRcvQueueStatus st rq Confirmed
|
||||
-- TODO update sender key in the store
|
||||
smp <- getSMPServerClient c srv
|
||||
liftSMP $ secureSMPQueue smp rcvPrivateKey rId senderKey
|
||||
withStore $ \st -> updateReceiveQueueStatus st rcvId Secured
|
||||
withStore $ \st -> updateRcvQueueStatus st rq Secured
|
||||
|
||||
decryptMessage :: MonadUnliftIO m => PrivateKey -> ByteString -> m ByteString
|
||||
decryptMessage _decryptKey = return
|
||||
@@ -272,12 +272,12 @@ sendConfirmation ::
|
||||
SendQueue ->
|
||||
SMP.SenderKey ->
|
||||
m ()
|
||||
sendConfirmation c SendQueue {server, sndId} senderKey = do
|
||||
sendConfirmation c sq@SendQueue {server, sndId} senderKey = do
|
||||
-- TODO send initial confirmation with signature - change in SMP server
|
||||
smp <- getSMPServerClient c server
|
||||
msg <- mkConfirmation
|
||||
liftSMP $ sendSMPMessage smp "" sndId msg
|
||||
withStore $ \st -> updateSendQueueStatus st sndId Confirmed
|
||||
withStore $ \st -> updateSndQueueStatus st sq Confirmed
|
||||
where
|
||||
mkConfirmation :: m SMP.MsgBody
|
||||
mkConfirmation = do
|
||||
@@ -291,11 +291,11 @@ sendHello ::
|
||||
AgentClient ->
|
||||
SendQueue ->
|
||||
m ()
|
||||
sendHello c SendQueue {server, sndId, sndPrivateKey, encryptKey} = do
|
||||
sendHello c sq@SendQueue {server, sndId, sndPrivateKey, encryptKey} = do
|
||||
smp <- getSMPServerClient c server
|
||||
msg <- mkHello "" $ AckMode On -- TODO verifyKey
|
||||
_send smp 20 msg
|
||||
withStore $ \st -> updateSendQueueStatus st sndId Active
|
||||
withStore $ \st -> updateSndQueueStatus st sq Active
|
||||
where
|
||||
mkHello :: PublicKey -> AckMode -> m ByteString
|
||||
mkHello verifyKey ackMode =
|
||||
|
||||
@@ -103,9 +103,8 @@ class Monad m => MonadAgentStore s m where
|
||||
addSndQueue :: s -> ConnAlias -> SendQueue -> m ()
|
||||
addRcvQueue :: s -> ConnAlias -> ReceiveQueue -> m ()
|
||||
removeSndAuth :: s -> ConnAlias -> m ()
|
||||
updateQueueStatus :: s -> ConnAlias -> QueueDirection -> QueueStatus -> m ()
|
||||
updateReceiveQueueStatus :: s -> RecipientId -> QueueStatus -> m ()
|
||||
updateSendQueueStatus :: s -> SenderId -> QueueStatus -> m ()
|
||||
updateRcvQueueStatus :: s -> ReceiveQueue -> QueueStatus -> m ()
|
||||
updateSndQueueStatus :: s -> SendQueue -> QueueStatus -> m ()
|
||||
createMsg :: s -> ConnAlias -> QueueDirection -> AgentMsgId -> AMessage -> m ()
|
||||
getLastMsg :: s -> ConnAlias -> QueueDirection -> m MessageDelivery
|
||||
getMsg :: s -> ConnAlias -> QueueDirection -> AgentMsgId -> m MessageDelivery
|
||||
|
||||
@@ -39,6 +39,7 @@ import Simplex.Messaging.Util
|
||||
import Text.Read
|
||||
import qualified UnliftIO.Exception as E
|
||||
import UnliftIO.STM
|
||||
import Network.Socket
|
||||
|
||||
addRcvQueueQuery :: Query
|
||||
addRcvQueueQuery =
|
||||
@@ -310,29 +311,39 @@ deleteConnection store connAlias = do
|
||||
"DELETE FROM connections WHERE conn_alias = ?"
|
||||
(Only connAlias)
|
||||
|
||||
updateRcvQueueStatus :: MonadUnliftIO m => SQLiteStore -> QueueRowId -> QueueStatus -> m ()
|
||||
updateRcvQueueStatus store rcvQueueId status =
|
||||
updateReceiveQueueStatus :: MonadUnliftIO m => SQLiteStore -> RecipientId -> HostName -> Maybe ServiceName -> QueueStatus -> m ()
|
||||
updateReceiveQueueStatus store rcvQueueId host port status =
|
||||
executeWithLock
|
||||
store
|
||||
rcvQueuesLock
|
||||
[s|
|
||||
UPDATE receive_queues
|
||||
SET status = ?
|
||||
WHERE receive_queue_id = ?;
|
||||
WHERE rcv_id = ?
|
||||
AND server_id IN (
|
||||
SELECT server_id
|
||||
FROM servers
|
||||
WHERE host = ? AND port = ?
|
||||
);
|
||||
|]
|
||||
(Only status :. Only rcvQueueId)
|
||||
(Only status :. Only rcvQueueId :. Only host :. Only port)
|
||||
|
||||
updateSndQueueStatus :: MonadUnliftIO m => SQLiteStore -> QueueRowId -> QueueStatus -> m ()
|
||||
updateSndQueueStatus store sndQueueId status =
|
||||
updateSendQueueStatus :: MonadUnliftIO m => SQLiteStore -> SenderId -> HostName -> Maybe ServiceName -> QueueStatus -> m ()
|
||||
updateSendQueueStatus store sndQueueId host port status =
|
||||
executeWithLock
|
||||
store
|
||||
sndQueuesLock
|
||||
[s|
|
||||
UPDATE send_queues
|
||||
SET status = ?
|
||||
WHERE send_queue_id = ?;
|
||||
WHERE snd_id = ?
|
||||
AND server_id IN (
|
||||
SELECT server_id
|
||||
FROM servers
|
||||
WHERE host = ? AND port = ?
|
||||
);
|
||||
|]
|
||||
(Only status :. Only sndQueueId)
|
||||
(Only status :. Only sndQueueId :. Only host :. Only port)
|
||||
|
||||
instance ToField QueueDirection where toField = toField . show
|
||||
|
||||
@@ -426,27 +437,17 @@ instance (MonadUnliftIO m, MonadError StoreError m) => MonadAgentStore SQLiteSto
|
||||
when (isNothing rcvQId && isNothing sndQId) $ throwError SEBadConn
|
||||
|
||||
removeSndAuth :: SQLiteStore -> ConnAlias -> m ()
|
||||
removeSndAuth _st _connAlias = throwError SEInternal
|
||||
removeSndAuth _st _connAlias = throwError SENotImplemented
|
||||
|
||||
updateQueueStatus :: SQLiteStore -> ConnAlias -> QueueDirection -> QueueStatus -> m ()
|
||||
updateQueueStatus st connAlias qDirection status = do
|
||||
case qDirection of
|
||||
RCV -> do
|
||||
(rcvQId, _) <- getConnection st connAlias
|
||||
case rcvQId of
|
||||
Just qId -> updateRcvQueueStatus st qId status
|
||||
Nothing -> throwError SEBadQueueDirection
|
||||
SND -> do
|
||||
(_, sndQId) <- getConnection st connAlias
|
||||
case sndQId of
|
||||
Just qId -> updateSndQueueStatus st qId status
|
||||
Nothing -> throwError SEBadQueueDirection
|
||||
-- TODO throw error if queue doesn't exist
|
||||
updateRcvQueueStatus :: SQLiteStore -> ReceiveQueue -> QueueStatus -> m ()
|
||||
updateRcvQueueStatus st ReceiveQueue {rcvId, server = SMPServer {host, port}} status =
|
||||
updateReceiveQueueStatus st rcvId host port status
|
||||
|
||||
updateReceiveQueueStatus :: SQLiteStore -> RecipientId -> QueueStatus -> m ()
|
||||
updateReceiveQueueStatus _st _rId _status = throwError SENotImplemented
|
||||
|
||||
updateSendQueueStatus :: SQLiteStore -> SenderId -> QueueStatus -> m ()
|
||||
updateSendQueueStatus _st _sId _status = throwError SENotImplemented
|
||||
-- TODO throw error if queue doesn't exist
|
||||
updateSndQueueStatus :: SQLiteStore -> SendQueue -> QueueStatus -> m ()
|
||||
updateSndQueueStatus st SendQueue {sndId, server = SMPServer {host, port}} status =
|
||||
updateSendQueueStatus st sndId host port status
|
||||
|
||||
-- TODO decrease duplication of queue direction checks?
|
||||
createMsg :: SQLiteStore -> ConnAlias -> QueueDirection -> AgentMsgId -> AMessage -> m ()
|
||||
|
||||
+36
-48
@@ -50,12 +50,12 @@ storeTests = withStore do
|
||||
describe "Receive connection" testDeleteConnReceive
|
||||
describe "Send connection" testDeleteConnSend
|
||||
describe "Duplex connection" testDeleteConnDuplex
|
||||
describe "updateQueueStatus" do
|
||||
describe "Receive connection" testUpdateQueueStatusConnReceive
|
||||
describe "Send connection" testUpdateQueueStatusConnSend
|
||||
describe "Update queue status" do
|
||||
describe "Receive queue" testupdateRcvQueueStatus
|
||||
describe "Send queue" testupdateSndQueueStatus
|
||||
describe "Duplex connection" testUpdateQueueStatusConnDuplex
|
||||
describe "Bad queue direction - SND" testUpdateQueueStatusBadDirectionSnd
|
||||
describe "Bad queue direction - RCV" testUpdateQueueStatusBadDirectionRcv
|
||||
xdescribe "Nonexistent send queue" testUpdateNonexistentSendQueueStatus
|
||||
xdescribe "Nonexistent receive queue" testUpdateNonexistentReceiveQueueStatus
|
||||
describe "createMsg" do
|
||||
describe "A_MSG in RCV direction" testCreateMsgRcv
|
||||
describe "A_MSG in SND direction" testCreateMsgSnd
|
||||
@@ -304,9 +304,9 @@ testDeleteConnDuplex = do
|
||||
getConn store "conn1"
|
||||
`throwsError` SEInternal
|
||||
|
||||
testUpdateQueueStatusConnReceive :: SpecWith SQLiteStore
|
||||
testUpdateQueueStatusConnReceive = do
|
||||
it "should update status of receive queue in receive connection" $ \store -> do
|
||||
testupdateRcvQueueStatus :: SpecWith SQLiteStore
|
||||
testupdateRcvQueueStatus = do
|
||||
it "should update status of receive queue" $ \store -> do
|
||||
let rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
@@ -323,14 +323,14 @@ testUpdateQueueStatusConnReceive = do
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCReceive (ReceiveConnection "conn1" rcvQueue)
|
||||
updateQueueStatus store "conn1" RCV Confirmed
|
||||
updateRcvQueueStatus store rcvQueue Confirmed
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCReceive (ReceiveConnection "conn1" rcvQueue {status = Confirmed})
|
||||
|
||||
testUpdateQueueStatusConnSend :: SpecWith SQLiteStore
|
||||
testUpdateQueueStatusConnSend = do
|
||||
it "should update status of send queue in send connection" $ \store -> do
|
||||
testupdateSndQueueStatus :: SpecWith SQLiteStore
|
||||
testupdateSndQueueStatus = do
|
||||
it "should update status of send queue" $ \store -> do
|
||||
let sndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
@@ -345,7 +345,7 @@ testUpdateQueueStatusConnSend = do
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCSend (SendConnection "conn1" sndQueue)
|
||||
updateQueueStatus store "conn1" SND Confirmed
|
||||
updateSndQueueStatus store sndQueue Confirmed
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCSend (SendConnection "conn1" sndQueue {status = Confirmed})
|
||||
@@ -381,18 +381,34 @@ testUpdateQueueStatusConnDuplex = do
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCDuplex (DuplexConnection "conn1" rcvQueue sndQueue)
|
||||
updateQueueStatus store "conn1" RCV Secured
|
||||
updateRcvQueueStatus store rcvQueue Secured
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCDuplex (DuplexConnection "conn1" rcvQueue {status = Secured} sndQueue)
|
||||
updateQueueStatus store "conn1" SND Confirmed
|
||||
updateSndQueueStatus store sndQueue Confirmed
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCDuplex (DuplexConnection "conn1" rcvQueue {status = Secured} sndQueue {status = Confirmed})
|
||||
|
||||
testUpdateQueueStatusBadDirectionSnd :: SpecWith SQLiteStore
|
||||
testUpdateQueueStatusBadDirectionSnd = do
|
||||
it "should throw error on attempt to update status of send queue in receive connection" $ \store -> do
|
||||
testUpdateNonexistentSendQueueStatus :: SpecWith SQLiteStore
|
||||
testUpdateNonexistentSendQueueStatus = do
|
||||
it "should throw error on attempt to update status of nonexistent send queue" $ \store -> do
|
||||
let sndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "1234",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
updateSndQueueStatus store sndQueue Confirmed
|
||||
`throwsError` SEInternal
|
||||
|
||||
testUpdateNonexistentReceiveQueueStatus :: SpecWith SQLiteStore
|
||||
testUpdateNonexistentReceiveQueueStatus = do
|
||||
it "should throw error on attempt to update status of nonexistent receive queue" $ \store -> do
|
||||
let rcvQueue =
|
||||
ReceiveQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
@@ -405,36 +421,8 @@ testUpdateQueueStatusBadDirectionSnd = do
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
createRcvConn store "conn1" rcvQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCReceive (ReceiveConnection "conn1" rcvQueue)
|
||||
updateQueueStatus store "conn1" SND Confirmed
|
||||
`throwsError` SEBadQueueDirection
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCReceive (ReceiveConnection "conn1" rcvQueue)
|
||||
|
||||
testUpdateQueueStatusBadDirectionRcv :: SpecWith SQLiteStore
|
||||
testUpdateQueueStatusBadDirectionRcv = do
|
||||
it "should throw error on attempt to update status of receive queue in send connection" $ \store -> do
|
||||
let sndQueue =
|
||||
SendQueue
|
||||
{ server = SMPServer "smp.simplex.im" (Just "5223") (Just "1234"),
|
||||
sndId = "1234",
|
||||
sndPrivateKey = "abcd",
|
||||
encryptKey = "dcba",
|
||||
signKey = "edcb",
|
||||
status = New,
|
||||
ackMode = AckMode On
|
||||
}
|
||||
createSndConn store "conn1" sndQueue
|
||||
`returnsResult` ()
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCSend (SendConnection "conn1" sndQueue)
|
||||
updateQueueStatus store "conn1" RCV Confirmed
|
||||
`throwsError` SEBadQueueDirection
|
||||
getConn store "conn1"
|
||||
`returnsResult` SomeConn SCSend (SendConnection "conn1" sndQueue)
|
||||
updateRcvQueueStatus store rcvQueue Confirmed
|
||||
`throwsError` SEInternal
|
||||
|
||||
testCreateMsgRcv :: SpecWith SQLiteStore
|
||||
testCreateMsgRcv = do
|
||||
|
||||
Reference in New Issue
Block a user