Add subs to ProtocolClient

This commit is contained in:
IC Rainbow
2024-05-06 13:20:42 +03:00
parent fe8e8366c8
commit 29c673dd64
3 changed files with 29 additions and 20 deletions
+3 -1
View File
@@ -118,7 +118,7 @@ module Simplex.Messaging.Agent
)
where
import Control.Logger.Simple (logError, logInfo, showText)
import Control.Logger.Simple
import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
@@ -2076,6 +2076,8 @@ data ACKd = ACKd | ACKPending
-- it cannot be finally, unfortunately, as sometimes it needs to be ACK+DEL
processSMPTransmission :: AgentClient -> ServerTransmission SMPVersion ErrorType BrokerMsg -> AM ()
processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, sessId, tType, rId, cmd) = do
active <- atomically $ activeClientSession c tSess sessId
unless active $ logNote $ "Inactive client for " <> tshow (strEncode srv)
(rq, SomeConn _ conn) <- withStore c (\db -> getRcvConn db srv rId)
processSMP rq conn $ toConnData conn
where
+20 -17
View File
@@ -172,6 +172,7 @@ import Data.Text.Encoding
import Data.Time (UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime)
import Data.Time.Clock.System (getSystemTime)
import Data.Word (Word16)
import GHC.Stack (HasCallStack, withFrozenCallStack)
import Network.Socket (HostName)
import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError)
import qualified Simplex.FileTransfer.Client as X
@@ -890,42 +891,42 @@ getMapLock locks key = TM.lookup key locks >>= maybe newLock pure
where
newLock = createLock >>= \l -> TM.insert key l locks $> l
withClient_ :: forall a v err msg. ProtocolServerClient v err msg => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> AM a) -> AM a
withClient_ :: forall a v err msg. (HasCallStack, ProtocolServerClient v err msg) => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> AM a) -> AM a
withClient_ c tSess@(userId, srv, _) statCmd action = do
cl <- getProtocolServerClient c tSess
(action cl <* stat cl "OK") `catchAgentError` logServerError cl
(action cl <* stat cl "OK") `catchAgentError` \e -> withFrozenCallStack $ logServerError cl e
where
stat cl = liftIO . incClientStat c userId cl statCmd
logServerError :: Client msg -> AgentErrorType -> AM a
logServerError :: HasCallStack => Client msg -> AgentErrorType -> AM a
logServerError cl e = do
logServer "<--" c srv "" $ strEncode e
withFrozenCallStack $ logServer "<--" c srv "" $ strEncode e
stat cl $ strEncode e
throwError e
withLogClient_ :: ProtocolServerClient v err msg => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> AM a) -> AM a
withLogClient_ :: (HasCallStack, ProtocolServerClient v err msg) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> AM a) -> AM a
withLogClient_ c tSess@(_, srv, _) entId cmdStr action = do
logServer "-->" c srv entId cmdStr
withFrozenCallStack $ logServer "-->" c srv entId cmdStr
res <- withClient_ c tSess cmdStr action
logServer "<--" c srv entId "OK"
withFrozenCallStack $ logServer "<--" c srv entId "OK"
return res
withClient :: forall v err msg a. ProtocolServerClient v err msg => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> AM a
withClient c tSess statKey action = withClient_ c tSess statKey $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer client) $ action client
withClient :: forall v err msg a. (HasCallStack, ProtocolServerClient v err msg) => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> AM a
withClient c tSess statKey action = withFrozenCallStack $ withClient_ c tSess statKey $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer client) $ action client
{-# INLINE withClient #-}
withLogClient :: forall v err msg a. ProtocolServerClient v err msg => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> AM a
withLogClient c tSess entId cmdStr action = withLogClient_ c tSess entId cmdStr $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer client) $ action client
withLogClient :: forall v err msg a. (HasCallStack, ProtocolServerClient v err msg) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> AM a
withLogClient c tSess entId cmdStr action = withFrozenCallStack $ withLogClient_ c tSess entId cmdStr $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer client) $ action client
{-# INLINE withLogClient #-}
withSMPClient :: SMPQueueRec q => AgentClient -> q -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> AM a
withSMPClient :: (HasCallStack, SMPQueueRec q) => AgentClient -> q -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> AM a
withSMPClient c q cmdStr action = do
tSess <- liftIO $ mkSMPTransportSession c q
withLogClient c tSess (queueId q) cmdStr action
withFrozenCallStack $ withLogClient c tSess (queueId q) cmdStr action
withSMPClient_ :: SMPQueueRec q => AgentClient -> q -> ByteString -> (SMPClient -> AM a) -> AM a
withSMPClient_ :: (HasCallStack, SMPQueueRec q) => AgentClient -> q -> ByteString -> (SMPClient -> AM a) -> AM a
withSMPClient_ c q cmdStr action = do
tSess <- liftIO $ mkSMPTransportSession c q
withLogClient_ c tSess (queueId q) cmdStr action
withFrozenCallStack $ withLogClient_ c tSess (queueId q) cmdStr action
withNtfClient :: AgentClient -> NtfServer -> EntityId -> ByteString -> (NtfClient -> ExceptT NtfClientError IO a) -> AM a
withNtfClient c srv = withLogClient c (0, srv, Nothing)
@@ -1179,7 +1180,9 @@ subscribeQueues c qs = do
pure $ if prohibited then Left (rq, Left $ CMD PROHIBITED) else Right rq
subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError ())
subscribeQueues_ env session smp qs' = do
atomically . modifyTVar (sentSubs smp) . M.union $ M.fromList [(connId, False) | RcvQueue {connId} <- L.toList qs']
rs <- sendBatch subscribeSMPQueues smp qs'
atomically . modifyTVar (sentSubs smp) . M.union $ M.fromList [(connId, True) | (RcvQueue {connId}, Right ()) <- L.toList rs]
active <-
atomically $
ifM
@@ -1278,9 +1281,9 @@ getSubscriptions :: AgentClient -> STM (Set ConnId)
getSubscriptions = readTVar . subscrConns
{-# INLINE getSubscriptions #-}
logServer :: MonadIO m => ByteString -> AgentClient -> ProtocolServer s -> QueueId -> ByteString -> m ()
logServer :: (HasCallStack, MonadIO m) => ByteString -> AgentClient -> ProtocolServer s -> QueueId -> ByteString -> m ()
logServer dir AgentClient {clientId} srv qId cmdStr =
logInfo . decodeUtf8 $ B.unwords ["A", "(" <> bshow clientId <> ")", dir, showServer srv, ":", logSecret qId, cmdStr]
withFrozenCallStack $ logInfo . decodeUtf8 $ B.unwords ["A", "(" <> bshow clientId <> ")", dir, showServer srv, ":", logSecret qId, cmdStr]
{-# INLINE logServer #-}
showServer :: ProtocolServer s -> ByteString
+6 -2
View File
@@ -28,7 +28,7 @@
module Simplex.Messaging.Client
( -- * Connect (disconnect) client to (from) SMP server
TransportSession,
ProtocolClient (thParams, sessionTs),
ProtocolClient (thParams, sessionTs, sentSubs),
SMPClient,
getProtocolClient,
closeProtocolClient,
@@ -123,6 +123,7 @@ data ProtocolClient v err msg = ProtocolClient
{ action :: Maybe (Async ()),
thParams :: THandleParams v 'TClient,
sessionTs :: UTCTime,
sentSubs :: TMap ByteString Bool, -- DEBUG: ConnId -> sub status
client_ :: PClient v err msg
}
@@ -149,6 +150,7 @@ smpClientStub g sessionId thVersion thAuth = do
clientCorrId <- C.newRandomDRG g
sentCommands <- TM.empty
sendPings <- newTVar False
sentSubs <- TM.empty
lastReceived <- newTVar ts
timeoutErrorCount <- newTVar 0
sndQ <- newTBQueue 100
@@ -166,6 +168,7 @@ smpClientStub g sessionId thVersion thAuth = do
batch = True
},
sessionTs = ts,
sentSubs,
client_ =
PClient
{ connected,
@@ -402,7 +405,8 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize
Left e -> atomically . putTMVar cVar . Left $ PCETransportError e
Right th@THandle {params} -> do
sessionTs <- getCurrentTime
let c' = ProtocolClient {action = Nothing, client_ = c, thParams = params, sessionTs}
sentSubs <- atomically TM.empty
let c' = ProtocolClient {action = Nothing, client_ = c, thParams = params, sessionTs, sentSubs}
atomically $ writeTVar (lastReceived c) sessionTs
atomically $ do
writeTVar (connected c) True