agent: use IORef for DRG (#882)

This commit is contained in:
Evgeny Poberezkin
2023-10-31 23:52:13 +00:00
committed by GitHub
parent 0410948b56
commit b5f733d2db
6 changed files with 49 additions and 41 deletions
+2 -2
View File
@@ -104,7 +104,7 @@ closeXFTPAgent XFTPAgent {xftpRcvWorkers, xftpSndWorkers} = do
xftpReceiveFile' :: AgentMonad m => AgentClient -> UserId -> ValidFileDescription 'FRecipient -> Maybe CryptoFileArgs -> m RcvFileId
xftpReceiveFile' c userId (ValidFileDescription fd@FileDescription {chunks}) cfArgs = do
g <- asks idsDrg
g <- asks random
prefixPath <- getPrefixPath "rcv.xftp"
createDirectory prefixPath
let relPrefixPath = takeFileName prefixPath
@@ -283,7 +283,7 @@ notify c entId cmd = atomically $ writeTBQueue (subQ c) ("", entId, APC (sAEntit
xftpSendFile' :: AgentMonad m => AgentClient -> UserId -> CryptoFile -> Int -> m SndFileId
xftpSendFile' c userId file numRecipients = do
g <- asks idsDrg
g <- asks random
prefixPath <- getPrefixPath "snd.xftp"
createDirectory prefixPath
let relPrefixPath = takeFileName prefixPath
+5 -5
View File
@@ -473,7 +473,7 @@ newConnAsync c userId corrId enableNtfs cMode subMode = do
newConnNoQueues :: AgentMonad m => AgentClient -> UserId -> ConnId -> Bool -> SConnectionMode c -> m ConnId
newConnNoQueues c userId connId enableNtfs cMode = do
g <- asks idsDrg
g <- asks random
connAgentVersion <- asks $ maxVersion . smpAgentVRange . config
-- connection mode is determined by the accepting agent
let cData = ConnData {userId, connId, connAgentVersion, enableNtfs, duplexHandshake = Nothing, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk}
@@ -485,7 +485,7 @@ joinConnAsync c userId corrId enableNtfs cReqUri@(CRInvitationUri ConnReqUriData
aVRange <- asks $ smpAgentVRange . config
case crAgentVRange `compatibleVersion` aVRange of
Just (Compatible connAgentVersion) -> do
g <- asks idsDrg
g <- asks random
let duplexHS = connAgentVersion /= 1
cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, duplexHandshake = Just duplexHS, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk}
connId <- withStore c $ \db -> createNewConn db g cData SCMInvitation
@@ -619,7 +619,7 @@ joinConnSrv :: AgentMonad m => AgentClient -> UserId -> ConnId -> Bool -> Connec
joinConnSrv c userId connId enableNtfs inv@CRInvitationUri {} cInfo subMode srv =
withInvLock c (strEncode inv) "joinConnSrv" $ do
(aVersion, cData@ConnData {connAgentVersion}, q, rc, e2eSndParams) <- startJoinInvitation userId connId enableNtfs inv
g <- asks idsDrg
g <- asks random
connId' <- withStore c $ \db -> runExceptT $ do
connId' <- ExceptT $ createSndConn db g cData q
liftIO $ createRatchet db connId' rc
@@ -2061,7 +2061,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s
where
processConf connInfo senderConf duplexHS = do
let newConfirmation = NewConfirmation {connId, senderConf, ratchetState = rc'}
g <- asks idsDrg
g <- asks random
confId <- withStore c $ \db -> do
setHandshakeVersion db connId agentVersion duplexHS
createConfirmation db g newConfirmation
@@ -2238,7 +2238,7 @@ processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), v, s
logServer "<--" c srv rId "MSG <KEY>"
case conn' of
ContactConnection {} -> do
g <- asks idsDrg
g <- asks random
let newInv = NewInvitation {contactConnId = connId, connReq, recipientConnInfo = cInfo}
invId <- withStore c $ \db -> createInvitation db g newInv
let srvs = L.map qServer $ crSmpQueues crData
+4 -3
View File
@@ -35,6 +35,7 @@ import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Crypto.Random
import Data.Int (Int64)
import Data.IORef (IORef, newIORef)
import Data.List.NonEmpty (NonEmpty)
import Data.Map (Map)
import Data.Time.Clock (NominalDiffTime, nominalDay)
@@ -178,7 +179,7 @@ defaultAgentConfig =
data Env = Env
{ config :: AgentConfig,
store :: SQLiteStore,
idsDrg :: TVar ChaChaDRG,
random :: IORef ChaChaDRG,
clientCounter :: TVar Int,
randomServer :: TVar StdGen,
ntfSupervisor :: NtfSupervisor,
@@ -187,12 +188,12 @@ data Env = Env
newSMPAgentEnv :: AgentConfig -> SQLiteStore -> IO Env
newSMPAgentEnv config@AgentConfig {initialClientId} store = do
idsDrg <- newTVarIO =<< liftIO drgNew
random <- newIORef =<< drgNew
clientCounter <- newTVarIO initialClientId
randomServer <- newTVarIO =<< liftIO newStdGen
ntfSupervisor <- atomically . newNtfSubSupervisor $ tbqSize config
xftpAgent <- atomically newXFTPAgent
pure Env {config, store, idsDrg, clientCounter, randomServer, ntfSupervisor, xftpAgent}
pure Env {config, store, random, clientCounter, randomServer, ntfSupervisor, xftpAgent}
createAgentStore :: FilePath -> String -> MigrationConfirmation -> IO (Either MigrationError SQLiteStore)
createAgentStore dbFilePath dbKey = createSQLiteStore dbFilePath dbKey Migrations.app
+12 -12
View File
@@ -218,7 +218,7 @@ where
import Control.Monad
import Control.Monad.Except
import Control.Monad.IO.Class
import Crypto.Random (ChaChaDRG, randomBytesGenerate)
import Crypto.Random (ChaChaDRG)
import qualified Data.Aeson.TH as J
import qualified Data.Attoparsec.ByteString.Char8 as A
import Data.Bifunctor (second)
@@ -510,7 +510,7 @@ deleteUsersWithoutConns db = do
pure userIds
createConn_ ::
TVar ChaChaDRG ->
IORef ChaChaDRG ->
ConnData ->
(ByteString -> IO ()) ->
IO (Either StoreError ByteString)
@@ -518,7 +518,7 @@ createConn_ gVar cData create = checkConstraint SEConnDuplicate $ case cData of
ConnData {connId = ""} -> createWithRandomId gVar create
ConnData {connId} -> create connId $> Right connId
createNewConn :: DB.Connection -> TVar ChaChaDRG -> ConnData -> SConnectionMode c -> IO (Either StoreError ConnId)
createNewConn :: DB.Connection -> IORef ChaChaDRG -> ConnData -> SConnectionMode c -> IO (Either StoreError ConnId)
createNewConn db gVar cData@ConnData {userId, connAgentVersion, enableNtfs, duplexHandshake} cMode =
createConn_ gVar cData $ \connId -> do
DB.execute db "INSERT INTO connections (user_id, conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake) VALUES (?,?,?,?,?,?)" (userId, connId, cMode, connAgentVersion, enableNtfs, duplexHandshake)
@@ -543,14 +543,14 @@ updateNewConnSnd db connId sq =
updateConn :: IO (Either StoreError Int64)
updateConn = Right <$> addConnSndQueue_ db connId sq
createRcvConn :: DB.Connection -> TVar ChaChaDRG -> ConnData -> RcvQueue -> SConnectionMode c -> IO (Either StoreError ConnId)
createRcvConn :: DB.Connection -> IORef ChaChaDRG -> ConnData -> RcvQueue -> SConnectionMode c -> IO (Either StoreError ConnId)
createRcvConn db gVar cData@ConnData {userId, connAgentVersion, enableNtfs, duplexHandshake} q@RcvQueue {server} cMode =
createConn_ gVar cData $ \connId -> do
serverKeyHash_ <- createServer_ db server
DB.execute db "INSERT INTO connections (user_id, conn_id, conn_mode, smp_agent_version, enable_ntfs, duplex_handshake) VALUES (?,?,?,?,?,?)" (userId, connId, cMode, connAgentVersion, enableNtfs, duplexHandshake)
void $ insertRcvQueue_ db connId q serverKeyHash_
createSndConn :: DB.Connection -> TVar ChaChaDRG -> ConnData -> SndQueue -> IO (Either StoreError ConnId)
createSndConn :: DB.Connection -> IORef ChaChaDRG -> ConnData -> SndQueue -> IO (Either StoreError ConnId)
createSndConn db gVar cData@ConnData {userId, connAgentVersion, enableNtfs, duplexHandshake} q@SndQueue {server} =
-- check confirmed snd queue doesn't already exist, to prevent it being deleted by REPLACE in insertSndQueue_
ifM (liftIO $ checkConfirmedSndQueueExists_ db q) (pure $ Left SESndQueueExists) $
@@ -769,7 +769,7 @@ smpConfirmation (senderKey, e2ePubKey, connInfo, smpReplyQueues_, smpClientVersi
smpClientVersion = fromMaybe 1 smpClientVersion_
}
createConfirmation :: DB.Connection -> TVar ChaChaDRG -> NewConfirmation -> IO (Either StoreError ConfirmationId)
createConfirmation :: DB.Connection -> IORef ChaChaDRG -> NewConfirmation -> IO (Either StoreError ConfirmationId)
createConfirmation db gVar NewConfirmation {connId, senderConf = SMPConfirmation {senderKey, e2ePubKey, connInfo, smpReplyQueues, smpClientVersion}, ratchetState} =
createWithRandomId gVar $ \confirmationId ->
DB.execute
@@ -847,7 +847,7 @@ setHandshakeVersion :: DB.Connection -> ConnId -> Version -> Bool -> IO ()
setHandshakeVersion db connId aVersion duplexHS =
DB.execute db "UPDATE connections SET smp_agent_version = ?, duplex_handshake = ? WHERE conn_id = ?" (aVersion, duplexHS, connId)
createInvitation :: DB.Connection -> TVar ChaChaDRG -> NewInvitation -> IO (Either StoreError InvitationId)
createInvitation :: DB.Connection -> IORef ChaChaDRG -> NewInvitation -> IO (Either StoreError InvitationId)
createInvitation db gVar NewInvitation {contactConnId, connReq, recipientConnInfo} =
createWithRandomId gVar $ \invitationId ->
DB.execute
@@ -2074,7 +2074,7 @@ updateHashSnd_ dbConn connId SndMsgData {..} =
]
-- create record with a random ID
createWithRandomId :: TVar ChaChaDRG -> (ByteString -> IO ()) -> IO (Either StoreError ByteString)
createWithRandomId :: IORef ChaChaDRG -> (ByteString -> IO ()) -> IO (Either StoreError ByteString)
createWithRandomId gVar create = tryCreate 3
where
tryCreate :: Int -> IO (Either StoreError ByteString)
@@ -2087,8 +2087,8 @@ createWithRandomId gVar create = tryCreate 3
| SQL.sqlError e == SQL.ErrorConstraint -> tryCreate (n - 1)
| otherwise -> pure . Left . SEInternal $ bshow e
randomId :: TVar ChaChaDRG -> Int -> IO ByteString
randomId gVar n = U.encode <$> (atomically . stateTVar gVar $ randomBytesGenerate n)
randomId :: IORef ChaChaDRG -> Int -> IO ByteString
randomId gVar n = U.encode <$> C.pseudoRandomBytes' n gVar
ntfSubAndSMPAction :: NtfSubAction -> (Maybe NtfSubNTFAction, Maybe NtfSubSMPAction)
ntfSubAndSMPAction (NtfSubNTFAction action) = (Just action, Nothing)
@@ -2109,7 +2109,7 @@ getXFTPServerId_ db ProtocolServer {host, port, keyHash} = do
firstRow fromOnly SEXFTPServerNotFound $
DB.query db "SELECT xftp_server_id FROM xftp_servers WHERE xftp_host = ? AND xftp_port = ? AND xftp_key_hash = ?" (host, port, keyHash)
createRcvFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> CryptoFile -> IO (Either StoreError RcvFileId)
createRcvFile :: DB.Connection -> IORef ChaChaDRG -> UserId -> FileDescription 'FRecipient -> FilePath -> FilePath -> CryptoFile -> IO (Either StoreError RcvFileId)
createRcvFile db gVar userId fd@FileDescription {chunks} prefixPath tmpPath (CryptoFile savePath cfArgs) = runExceptT $ do
(rcvFileEntityId, rcvFileId) <- ExceptT $ insertRcvFile fd
liftIO $
@@ -2364,7 +2364,7 @@ getRcvFilesExpired db ttl = do
|]
(Only cutoffTs)
createSndFile :: DB.Connection -> TVar ChaChaDRG -> UserId -> CryptoFile -> Int -> FilePath -> C.SbKey -> C.CbNonce -> IO (Either StoreError SndFileId)
createSndFile :: DB.Connection -> IORef ChaChaDRG -> UserId -> CryptoFile -> Int -> FilePath -> C.SbKey -> C.CbNonce -> IO (Either StoreError SndFileId)
createSndFile db gVar userId (CryptoFile path cfArgs) numRecipients prefixPath key nonce =
createWithRandomId gVar $ \sndFileEntityId ->
DB.execute
+6
View File
@@ -130,6 +130,7 @@ module Simplex.Messaging.Crypto
-- * pseudo-random bytes
pseudoRandomBytes,
pseudoRandomBytes',
-- * digests
sha256Hash,
@@ -191,8 +192,10 @@ import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.ByteString.Lazy (fromStrict, toStrict)
import Data.Constraint (Dict (..))
import Data.IORef (IORef, atomicModifyIORef')
import Data.Kind (Constraint, Type)
import Data.String
import Data.Tuple (swap)
import Data.Type.Equality
import Data.Typeable (Proxy (Proxy), Typeable)
import Data.Word (Word32)
@@ -1144,6 +1147,9 @@ pseudoRandomCbNonce gVar = CryptoBoxNonce <$> pseudoRandomBytes 24 gVar
pseudoRandomBytes :: Int -> TVar ChaChaDRG -> STM ByteString
pseudoRandomBytes n gVar = stateTVar gVar $ randomBytesGenerate n
pseudoRandomBytes' :: Int -> IORef ChaChaDRG -> IO ByteString
pseudoRandomBytes' n gVar = atomicModifyIORef' gVar $ swap . randomBytesGenerate n
instance Encoding CbNonce where
smpEncode = unCbNonce
smpP = CryptoBoxNonce <$> A.take 24
+20 -19
View File
@@ -17,6 +17,7 @@ import Control.Exception (SomeException)
import Control.Monad (replicateM_)
import Crypto.Random (drgNew)
import Data.ByteString.Char8 (ByteString)
import Data.IORef (newIORef)
import Data.List (isInfixOf)
import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8)
@@ -116,7 +117,7 @@ storeTests = do
testConcurrentWrites :: SpecWith (SQLiteStore, SQLiteStore)
testConcurrentWrites =
it "should complete multiple concurrent write transactions w/t sqlite busy errors" $ \(s1, s2) -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- withTransaction s1 $ \db ->
createRcvConn db g cData1 rcvQueue1 SCMInvitation
let ConnData {connId} = cData1
@@ -206,7 +207,7 @@ sndQueue1 =
testCreateRcvConn :: SpecWith SQLiteStore
testCreateRcvConn =
it "should create RcvConnection and add SndQueue" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
createRcvConn db g cData1 rcvQueue1 SCMInvitation
`shouldReturn` Right "conn1"
getConn db "conn1"
@@ -219,7 +220,7 @@ testCreateRcvConn =
testCreateRcvConnRandomId :: SpecWith SQLiteStore
testCreateRcvConnRandomId =
it "should create RcvConnection and add SndQueue with random ID" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
Right connId <- createRcvConn db g cData1 {connId = ""} rcvQueue1 SCMInvitation
let rq' = (rcvQueue1 :: RcvQueue) {connId}
sq' = (sndQueue1 :: SndQueue) {connId}
@@ -233,7 +234,7 @@ testCreateRcvConnRandomId =
testCreateRcvConnDuplicate :: SpecWith SQLiteStore
testCreateRcvConnDuplicate =
it "should throw error on attempt to create duplicate RcvConnection" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- createRcvConn db g cData1 rcvQueue1 SCMInvitation
createRcvConn db g cData1 rcvQueue1 SCMInvitation
`shouldReturn` Left SEConnDuplicate
@@ -241,7 +242,7 @@ testCreateRcvConnDuplicate =
testCreateSndConn :: SpecWith SQLiteStore
testCreateSndConn =
it "should create SndConnection and add RcvQueue" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
createSndConn db g cData1 sndQueue1
`shouldReturn` Right "conn1"
getConn db "conn1"
@@ -254,7 +255,7 @@ testCreateSndConn =
testCreateSndConnRandomID :: SpecWith SQLiteStore
testCreateSndConnRandomID =
it "should create SndConnection and add RcvQueue with random ID" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
Right connId <- createSndConn db g cData1 {connId = ""} sndQueue1
let rq' = (rcvQueue1 :: RcvQueue) {connId}
sq' = (sndQueue1 :: SndQueue) {connId}
@@ -268,7 +269,7 @@ testCreateSndConnRandomID =
testCreateSndConnDuplicate :: SpecWith SQLiteStore
testCreateSndConnDuplicate =
it "should throw error on attempt to create duplicate SndConnection" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- createSndConn db g cData1 sndQueue1
createSndConn db g cData1 sndQueue1
`shouldReturn` Left SEConnDuplicate
@@ -278,7 +279,7 @@ testGetRcvConn =
it "should get connection using rcv queue id and server" . withStoreTransaction $ \db -> do
let smpServer = SMPServer "smp.simplex.im" "5223" testKeyHash
let recipientId = "1234"
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- createRcvConn db g cData1 rcvQueue1 SCMInvitation
getRcvConn db smpServer recipientId
`shouldReturn` Right (rcvQueue1, SomeConn SCRcv (RcvConnection cData1 rcvQueue1))
@@ -286,7 +287,7 @@ testGetRcvConn =
testDeleteRcvConn :: SpecWith SQLiteStore
testDeleteRcvConn =
it "should create RcvConnection and delete it" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- createRcvConn db g cData1 rcvQueue1 SCMInvitation
getConn db "conn1"
`shouldReturn` Right (SomeConn SCRcv (RcvConnection cData1 rcvQueue1))
@@ -298,7 +299,7 @@ testDeleteRcvConn =
testDeleteSndConn :: SpecWith SQLiteStore
testDeleteSndConn =
it "should create SndConnection and delete it" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- createSndConn db g cData1 sndQueue1
getConn db "conn1"
`shouldReturn` Right (SomeConn SCSnd (SndConnection cData1 sndQueue1))
@@ -310,7 +311,7 @@ testDeleteSndConn =
testDeleteDuplexConn :: SpecWith SQLiteStore
testDeleteDuplexConn =
it "should create DuplexConnection and delete it" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- createRcvConn db g cData1 rcvQueue1 SCMInvitation
_ <- upgradeRcvConnToDuplex db "conn1" sndQueue1
getConn db "conn1"
@@ -323,7 +324,7 @@ testDeleteDuplexConn =
testUpgradeRcvConnToDuplex :: SpecWith SQLiteStore
testUpgradeRcvConnToDuplex =
it "should throw error on attempt to add SndQueue to SndConnection or DuplexConnection" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- createSndConn db g cData1 sndQueue1
let anotherSndQueue =
SndQueue
@@ -351,7 +352,7 @@ testUpgradeRcvConnToDuplex =
testUpgradeSndConnToDuplex :: SpecWith SQLiteStore
testUpgradeSndConnToDuplex =
it "should throw error on attempt to add RcvQueue to RcvConnection or DuplexConnection" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- createRcvConn db g cData1 rcvQueue1 SCMInvitation
let anotherRcvQueue =
RcvQueue
@@ -382,7 +383,7 @@ testUpgradeSndConnToDuplex =
testSetRcvQueueStatus :: SpecWith SQLiteStore
testSetRcvQueueStatus =
it "should update status of RcvQueue" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- createRcvConn db g cData1 rcvQueue1 SCMInvitation
getConn db "conn1"
`shouldReturn` Right (SomeConn SCRcv (RcvConnection cData1 rcvQueue1))
@@ -394,7 +395,7 @@ testSetRcvQueueStatus =
testSetSndQueueStatus :: SpecWith SQLiteStore
testSetSndQueueStatus =
it "should update status of SndQueue" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- createSndConn db g cData1 sndQueue1
getConn db "conn1"
`shouldReturn` Right (SomeConn SCSnd (SndConnection cData1 sndQueue1))
@@ -406,7 +407,7 @@ testSetSndQueueStatus =
testSetQueueStatusDuplex :: SpecWith SQLiteStore
testSetQueueStatusDuplex =
it "should update statuses of RcvQueue and SndQueue in DuplexConnection" . withStoreTransaction $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
_ <- createRcvConn db g cData1 rcvQueue1 SCMInvitation
_ <- upgradeRcvConnToDuplex db "conn1" sndQueue1
getConn db "conn1"
@@ -458,7 +459,7 @@ testCreateRcvMsg_ db expectedPrevSndId expectedPrevHash connId rq rcvMsgData@Rcv
testCreateRcvMsg :: SpecWith SQLiteStore
testCreateRcvMsg =
it "should reserve internal ids and create a RcvMsg" $ \st -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
let ConnData {connId} = cData1
_ <- withTransaction st $ \db -> do
createRcvConn db g cData1 rcvQueue1 SCMInvitation
@@ -489,7 +490,7 @@ testCreateSndMsg_ db expectedPrevHash connId sndMsgData@SndMsgData {..} = do
testCreateSndMsg :: SpecWith SQLiteStore
testCreateSndMsg =
it "should create a SndMsg and return InternalId and PrevSndMsgHash" $ \st -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
let ConnData {connId} = cData1
_ <- withTransaction st $ \db -> do
createSndConn db g cData1 sndQueue1
@@ -502,7 +503,7 @@ testCreateRcvAndSndMsgs =
it "should create multiple RcvMsg and SndMsg, correctly ordering internal Ids and returning previous state" $ \st -> do
let ConnData {connId} = cData1
_ <- withTransaction st $ \db -> do
g <- newTVarIO =<< drgNew
g <- newIORef =<< drgNew
createRcvConn db g cData1 rcvQueue1 SCMInvitation
withTransaction st $ \db -> do
_ <- upgradeRcvConnToDuplex db "conn1" sndQueue1