diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 5cee13966..f8ef4d5b9 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -118,7 +118,7 @@ module Simplex.Messaging.Agent ) where -import Control.Logger.Simple (logError, logInfo, showText) +import Control.Logger.Simple import Control.Monad import Control.Monad.Except import Control.Monad.Reader @@ -2076,6 +2076,8 @@ data ACKd = ACKd | ACKPending -- it cannot be finally, unfortunately, as sometimes it needs to be ACK+DEL processSMPTransmission :: AgentClient -> ServerTransmission SMPVersion ErrorType BrokerMsg -> AM () processSMPTransmission c@AgentClient {smpClients, subQ} (tSess@(_, srv, _), _v, sessId, tType, rId, cmd) = do + active <- atomically $ activeClientSession c tSess sessId + unless active $ logNote $ "Inactive client for " <> tshow (strEncode srv) (rq, SomeConn _ conn) <- withStore c (\db -> getRcvConn db srv rId) processSMP rq conn $ toConnData conn where diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 887e3629f..336bd9967 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -172,6 +172,7 @@ import Data.Text.Encoding import Data.Time (UTCTime, defaultTimeLocale, diffUTCTime, formatTime, getCurrentTime) import Data.Time.Clock.System (getSystemTime) import Data.Word (Word16) +import GHC.Stack (HasCallStack, withFrozenCallStack) import Network.Socket (HostName) import Simplex.FileTransfer.Client (XFTPChunkSpec (..), XFTPClient, XFTPClientConfig (..), XFTPClientError) import qualified Simplex.FileTransfer.Client as X @@ -890,42 +891,42 @@ getMapLock locks key = TM.lookup key locks >>= maybe newLock pure where newLock = createLock >>= \l -> TM.insert key l locks $> l -withClient_ :: forall a v err msg. ProtocolServerClient v err msg => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> AM a) -> AM a +withClient_ :: forall a v err msg. (HasCallStack, ProtocolServerClient v err msg) => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> AM a) -> AM a withClient_ c tSess@(userId, srv, _) statCmd action = do cl <- getProtocolServerClient c tSess - (action cl <* stat cl "OK") `catchAgentError` logServerError cl + (action cl <* stat cl "OK") `catchAgentError` \e -> withFrozenCallStack $ logServerError cl e where stat cl = liftIO . incClientStat c userId cl statCmd - logServerError :: Client msg -> AgentErrorType -> AM a + logServerError :: HasCallStack => Client msg -> AgentErrorType -> AM a logServerError cl e = do - logServer "<--" c srv "" $ strEncode e + withFrozenCallStack $ logServer "<--" c srv "" $ strEncode e stat cl $ strEncode e throwError e -withLogClient_ :: ProtocolServerClient v err msg => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> AM a) -> AM a +withLogClient_ :: (HasCallStack, ProtocolServerClient v err msg) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> AM a) -> AM a withLogClient_ c tSess@(_, srv, _) entId cmdStr action = do - logServer "-->" c srv entId cmdStr + withFrozenCallStack $ logServer "-->" c srv entId cmdStr res <- withClient_ c tSess cmdStr action - logServer "<--" c srv entId "OK" + withFrozenCallStack $ logServer "<--" c srv entId "OK" return res -withClient :: forall v err msg a. ProtocolServerClient v err msg => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> AM a -withClient c tSess statKey action = withClient_ c tSess statKey $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer client) $ action client +withClient :: forall v err msg a. (HasCallStack, ProtocolServerClient v err msg) => AgentClient -> TransportSession msg -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> AM a +withClient c tSess statKey action = withFrozenCallStack $ withClient_ c tSess statKey $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer client) $ action client {-# INLINE withClient #-} -withLogClient :: forall v err msg a. ProtocolServerClient v err msg => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> AM a -withLogClient c tSess entId cmdStr action = withLogClient_ c tSess entId cmdStr $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer client) $ action client +withLogClient :: forall v err msg a. (HasCallStack, ProtocolServerClient v err msg) => AgentClient -> TransportSession msg -> EntityId -> ByteString -> (Client msg -> ExceptT (ProtocolClientError err) IO a) -> AM a +withLogClient c tSess entId cmdStr action = withFrozenCallStack $ withLogClient_ c tSess entId cmdStr $ \client -> liftClient (clientProtocolError @v @err @msg) (clientServer client) $ action client {-# INLINE withLogClient #-} -withSMPClient :: SMPQueueRec q => AgentClient -> q -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> AM a +withSMPClient :: (HasCallStack, SMPQueueRec q) => AgentClient -> q -> ByteString -> (SMPClient -> ExceptT SMPClientError IO a) -> AM a withSMPClient c q cmdStr action = do tSess <- liftIO $ mkSMPTransportSession c q - withLogClient c tSess (queueId q) cmdStr action + withFrozenCallStack $ withLogClient c tSess (queueId q) cmdStr action -withSMPClient_ :: SMPQueueRec q => AgentClient -> q -> ByteString -> (SMPClient -> AM a) -> AM a +withSMPClient_ :: (HasCallStack, SMPQueueRec q) => AgentClient -> q -> ByteString -> (SMPClient -> AM a) -> AM a withSMPClient_ c q cmdStr action = do tSess <- liftIO $ mkSMPTransportSession c q - withLogClient_ c tSess (queueId q) cmdStr action + withFrozenCallStack $ withLogClient_ c tSess (queueId q) cmdStr action withNtfClient :: AgentClient -> NtfServer -> EntityId -> ByteString -> (NtfClient -> ExceptT NtfClientError IO a) -> AM a withNtfClient c srv = withLogClient c (0, srv, Nothing) @@ -1179,7 +1180,9 @@ subscribeQueues c qs = do pure $ if prohibited then Left (rq, Left $ CMD PROHIBITED) else Right rq subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError ()) subscribeQueues_ env session smp qs' = do + atomically . modifyTVar (sentSubs smp) . M.union $ M.fromList [(connId, False) | RcvQueue {connId} <- L.toList qs'] rs <- sendBatch subscribeSMPQueues smp qs' + atomically . modifyTVar (sentSubs smp) . M.union $ M.fromList [(connId, True) | (RcvQueue {connId}, Right ()) <- L.toList rs] active <- atomically $ ifM @@ -1278,9 +1281,9 @@ getSubscriptions :: AgentClient -> STM (Set ConnId) getSubscriptions = readTVar . subscrConns {-# INLINE getSubscriptions #-} -logServer :: MonadIO m => ByteString -> AgentClient -> ProtocolServer s -> QueueId -> ByteString -> m () +logServer :: (HasCallStack, MonadIO m) => ByteString -> AgentClient -> ProtocolServer s -> QueueId -> ByteString -> m () logServer dir AgentClient {clientId} srv qId cmdStr = - logInfo . decodeUtf8 $ B.unwords ["A", "(" <> bshow clientId <> ")", dir, showServer srv, ":", logSecret qId, cmdStr] + withFrozenCallStack $ logInfo . decodeUtf8 $ B.unwords ["A", "(" <> bshow clientId <> ")", dir, showServer srv, ":", logSecret qId, cmdStr] {-# INLINE logServer #-} showServer :: ProtocolServer s -> ByteString diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 969d68e95..dc7b48d67 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -28,7 +28,7 @@ module Simplex.Messaging.Client ( -- * Connect (disconnect) client to (from) SMP server TransportSession, - ProtocolClient (thParams, sessionTs), + ProtocolClient (thParams, sessionTs, sentSubs), SMPClient, getProtocolClient, closeProtocolClient, @@ -123,6 +123,7 @@ data ProtocolClient v err msg = ProtocolClient { action :: Maybe (Async ()), thParams :: THandleParams v 'TClient, sessionTs :: UTCTime, + sentSubs :: TMap ByteString Bool, -- DEBUG: ConnId -> sub status client_ :: PClient v err msg } @@ -149,6 +150,7 @@ smpClientStub g sessionId thVersion thAuth = do clientCorrId <- C.newRandomDRG g sentCommands <- TM.empty sendPings <- newTVar False + sentSubs <- TM.empty lastReceived <- newTVar ts timeoutErrorCount <- newTVar 0 sndQ <- newTBQueue 100 @@ -166,6 +168,7 @@ smpClientStub g sessionId thVersion thAuth = do batch = True }, sessionTs = ts, + sentSubs, client_ = PClient { connected, @@ -402,7 +405,8 @@ getProtocolClient g transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize Left e -> atomically . putTMVar cVar . Left $ PCETransportError e Right th@THandle {params} -> do sessionTs <- getCurrentTime - let c' = ProtocolClient {action = Nothing, client_ = c, thParams = params, sessionTs} + sentSubs <- atomically TM.empty + let c' = ProtocolClient {action = Nothing, client_ = c, thParams = params, sessionTs, sentSubs} atomically $ writeTVar (lastReceived c) sessionTs atomically $ do writeTVar (connected c) True