diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index a99d957e3..74ae0055f 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -170,7 +170,7 @@ import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (isJust, isNothing, listToMaybe) +import Data.Maybe (catMaybes, isJust, isNothing, listToMaybe) import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) @@ -678,15 +678,12 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess serverDown :: ([RcvQueue], [ConnId]) -> IO () serverDown (qs, conns) = whenM (readTVarIO active) $ do incClientStat' c userId client "DISCONNECT" "" - notifySub "" $ hostEvent' DISCONNECT client - unless (null conns) $ notifySub "" $ DOWN srv conns + notifySub c "" $ hostEvent' DISCONNECT client + unless (null conns) $ notifySub c "" $ DOWN srv conns unless (null qs) $ do atomically $ mapM_ (releaseGetLock c) qs runReaderT (resubscribeSMPSession c tSess) env - notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO () - notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) - resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' () resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = atomically getWorkerVar >>= mapM_ (either newSubWorker (\_ -> pure ())) @@ -721,21 +718,22 @@ reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do (rs, sessId_) <- subscribeQueues c $ L.toList qs let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs conns = filter (`M.notMember` cs) okConns - unless (null conns) $ notifySub "" $ UP srv conns + unless (null conns) $ notifySub c "" $ UP srv conns let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs - mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs + mapM_ (\(connId, e) -> notifySub c connId $ ERR e) finalErrs forM_ (listToMaybe tempErrs) $ \(connId, e) -> do when (null okConns && M.null cs && null finalErrs) . liftIO $ forM_ sessId_ $ \sessId -> do -- We only close the client session that was used to subscribe. v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing) mapM_ (closeClient_ c) v_ - notifySub connId $ ERR e + notifySub c connId $ ERR e where handleNotify :: AM' () -> AM' () - handleNotify = E.handleAny $ notifySub "" . ERR . INTERNAL . show - notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> AM' () - notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) + handleNotify = E.handleAny $ notifySub c "" . ERR . INTERNAL . show + +notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> ACommand 'Agent e -> m () +notifySub c connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) getNtfServerClient :: AgentClient -> NtfTransportSession -> AM NtfClient getNtfServerClient c@AgentClient {active, ntfClients, workerSeq} tSess@(userId, srv, _) = do @@ -1291,14 +1289,10 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode = do qUri = SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey pure (rq, qUri, tSess, sessId) -processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM () +processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM (Maybe ConnId) processSubResult c rq@RcvQueue {connId} = \case - Left e -> - unless (temporaryClientError e) $ - failSubscription c rq e - Right () -> - whenM (hasPendingSubscription c connId) $ - addSubscription c rq + Left e -> Nothing <$ unless (temporaryClientError e) (failSubscription c rq e) + Right () -> ifM (hasPendingSubscription c connId) (Just connId <$ addSubscription c rq) (pure Nothing) temporaryAgentError :: AgentErrorType -> Bool temporaryAgentError = \case @@ -1349,23 +1343,26 @@ subscribeQueues c qs = do subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError ()) subscribeQueues_ env session smp qs' = do rs <- sendBatch subscribeSMPQueues smp qs' - active <- + (active, subResults) <- atomically $ ifM (activeClientSession c tSess sessId) - (writeTVar session (Just sessId) >> processSubResults rs $> True) - (pure False) + (writeTVar session (Just sessId) >> ((True,) <$> processSubResults rs)) + (pure (False, [])) if active - then when (hasTempErrors rs) resubscribe $> rs + then do + when (any isNothing subResults) resubscribe + let up = catMaybes $ L.toList subResults + unless (null up) $ notifySub c "" $ UP srv up + pure rs else do logWarn "subcription batch result for replaced SMP client, resubscribing" resubscribe $> L.map (second $ \_ -> Left PCENetworkError) rs where - tSess = transportSession' smp + tSess@(_, srv, _) = transportSession' smp sessId = sessionId $ thParams smp - hasTempErrors = any (either temporaryClientError (const False) . snd) - processSubResults :: NonEmpty (RcvQueue, Either SMPClientError ()) -> STM () - processSubResults = mapM_ $ uncurry $ processSubResult c + processSubResults :: NonEmpty (RcvQueue, Either SMPClientError ()) -> STM (NonEmpty (Maybe ConnId)) + processSubResults = mapM (uncurry $ processSubResult c) resubscribe = resubscribeSMPSession c tSess `runReaderT` env activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool