diff --git a/package.yaml b/package.yaml index bf8d45b26..8a9b664db 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: simplexmq -version: 5.1.2 +version: 5.1.3 synopsis: SimpleXMQ message broker description: | This package includes <./docs/Simplex-Messaging-Server.html server>, diff --git a/simplexmq.cabal b/simplexmq.cabal index 341e1a1d1..601e0a2e5 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -5,7 +5,7 @@ cabal-version: 1.12 -- see: https://github.com/sol/hpack name: simplexmq -version: 5.1.2 +version: 5.1.3 synopsis: SimpleXMQ message broker description: This package includes <./docs/Simplex-Messaging-Server.html server>, <./docs/Simplex-Messaging-Client.html client> and diff --git a/src/Simplex/FileTransfer/Client/Main.hs b/src/Simplex/FileTransfer/Client/Main.hs index badf4e7c0..20b6d177d 100644 --- a/src/Simplex/FileTransfer/Client/Main.hs +++ b/src/Simplex/FileTransfer/Client/Main.hs @@ -38,9 +38,8 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Char (toLower) -import Data.Function (on) import Data.Int (Int64) -import Data.List (foldl', groupBy, sortOn) +import Data.List (foldl', sortOn) import Data.List.NonEmpty (NonEmpty (..), nonEmpty) import qualified Data.List.NonEmpty as L import Data.Map (Map) @@ -66,7 +65,7 @@ import Simplex.Messaging.Encoding.String (StrEncoding (..)) import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Protocol (ProtoServerWithAuth (..), SenderId, SndPrivateSignKey, XFTPServer, XFTPServerWithAuth) import Simplex.Messaging.Server.CLI (getCliCommand') -import Simplex.Messaging.Util (ifM, tshow, whenM) +import Simplex.Messaging.Util (groupAllOn, ifM, tshow, whenM) import System.Exit (exitFailure) import System.FilePath (splitFileName, ()) import System.IO.Temp (getCanonicalTemporaryDirectory) @@ -316,7 +315,7 @@ cliSendFileOpts SendOptions {filePath, outputDir, numRecipients, xftpServers, re let xftpSrvs = fromMaybe defaultXFTPServers (nonEmpty xftpServers) srvs <- liftIO $ replicateM (length chunks) $ getXFTPServer gen xftpSrvs let thd3 (_, _, x) = x - chunks' = groupBy ((==) `on` thd3) $ sortOn thd3 $ zip3 [1 ..] chunks srvs + chunks' = groupAllOn thd3 $ zip3 [1 ..] chunks srvs -- TODO shuffle/unshuffle chunks -- the reason we don't do pooled downloads here within one server is that http2 library doesn't handle cleint concurrency, even though -- upload doesn't allow other requests within the same client until complete (but download does allow). @@ -428,7 +427,7 @@ cliReceiveFile ReceiveOptions {fileDescription, filePath, retryCount, tempPath, liftIO $ printNoNewLine "Downloading file..." downloadedChunks <- newTVarIO [] let srv FileChunk {replicas} = server (head replicas :: FileChunkReplica) - srvChunks = groupBy ((==) `on` srv) $ sortOn srv chunks + srvChunks = groupAllOn srv chunks chunkPaths <- map snd . sortOn fst . concat <$> pooledForConcurrentlyN 16 srvChunks (mapM $ downloadFileChunk a encPath size downloadedChunks) encDigest <- liftIO $ LC.sha512Hash <$> readChunks chunkPaths when (encDigest /= unFileDigest digest) $ throwError $ CLIError "File digest mismatch" diff --git a/src/Simplex/FileTransfer/Description.hs b/src/Simplex/FileTransfer/Description.hs index 316070f7f..0fa99e372 100644 --- a/src/Simplex/FileTransfer/Description.hs +++ b/src/Simplex/FileTransfer/Description.hs @@ -42,9 +42,8 @@ import qualified Data.Attoparsec.ByteString.Char8 as A import Data.Bifunctor (first) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.Function (on) import Data.Int (Int64) -import Data.List (foldl', groupBy, sortOn) +import Data.List (foldl', sortOn) import Data.Map (Map) import qualified Data.Map as M import Data.Maybe (fromMaybe) @@ -59,7 +58,7 @@ import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String import Simplex.Messaging.Parsers (parseAll) import Simplex.Messaging.Protocol (XFTPServer) -import Simplex.Messaging.Util (bshow, (<$?>)) +import Simplex.Messaging.Util (bshow, groupAllOn, (<$?>)) data FileDescription (p :: FileParty) = FileDescription { party :: SFileParty p, @@ -258,9 +257,7 @@ instance (ToField a) => ToField (FileSize a) where toField (FileSize s) = toFiel groupReplicasByServer :: FileSize Word32 -> [FileChunk] -> [[FileServerReplica]] groupReplicasByServer defChunkSize = - groupBy ((==) `on` replicaServer) - . sortOn replicaServer - . unfoldChunksToReplicas defChunkSize + groupAllOn replicaServer . unfoldChunksToReplicas defChunkSize encodeFileReplicas :: FileSize Word32 -> [FileChunk] -> [YAMLServerReplicas] encodeFileReplicas defChunkSize = @@ -268,7 +265,7 @@ encodeFileReplicas defChunkSize = where encodeServerReplicas fs = YAMLServerReplicas - { server = replicaServer $ head fs, -- groupBy guarantees that fs is not empty + { server = replicaServer $ head fs, -- groupAllOn guarantees that fs is not empty chunks = map (B.unpack . encodeServerReplica) fs } diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index aacb7d54b..6a9341b74 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -208,11 +208,10 @@ import Data.Bifunctor (second) import Data.ByteString (ByteString) import qualified Data.ByteString.Base64.URL as U import Data.Char (toLower) -import Data.Function (on) import Data.Functor (($>)) import Data.IORef import Data.Int (Int64) -import Data.List (foldl', groupBy, intercalate, sortBy) +import Data.List (foldl', intercalate, sortBy) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M @@ -250,7 +249,7 @@ import Simplex.Messaging.Parsers (blobFieldParser, dropPrefix, fromTextField_, s import Simplex.Messaging.Protocol import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Transport.Client (TransportHost) -import Simplex.Messaging.Util (bshow, diffToMilliseconds, eitherToMaybe, ($>>=), (<$$>)) +import Simplex.Messaging.Util (bshow, diffToMilliseconds, eitherToMaybe, groupOn, ($>>=), (<$$>)) import Simplex.Messaging.Version import System.Directory (copyFile, createDirectoryIfMissing, doesFileExist) import System.Exit (exitFailure) @@ -1092,7 +1091,9 @@ insertedRowId db = fromOnly . head <$> DB.query_ db "SELECT last_insert_rowid()" getPendingCommands :: DB.Connection -> ConnId -> IO [(Maybe SMPServer, [AsyncCmdId])] getPendingCommands db connId = do - map (\ids -> (fst $ head ids, map snd ids)) . groupBy ((==) `on` fst) . map srvCmdId + -- `groupOn` is used instead of `groupAllOn` to avoid extra sorting by `server + cmdId`, as the query already sorts by them. + -- TODO review whether this can break if, e.g., the server has another key hash. + map (\ids -> (fst $ head ids, map snd ids)) . groupOn fst . map srvCmdId <$> DB.query db [sql| diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 6cd14fcda..6e8b22c2f 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -219,8 +219,10 @@ data ProtocolClientConfig = ProtocolClientConfig defaultTransport :: (ServiceName, ATransport), -- | network configuration networkConfig :: NetworkConfig, - -- | SMP client-server protocol version range - smpServerVRange :: VersionRange + -- | client-server protocol version range + serverVRange :: VersionRange, + -- | delay between sending batches of commands (microseconds) + batchDelay :: Maybe Int } -- | Default protocol client configuration. @@ -230,7 +232,8 @@ defaultClientConfig = { qSize = 64, defaultTransport = ("443", transport @TLS), networkConfig = defaultNetworkConfig, - smpServerVRange = supportedSMPServerVRange + serverVRange = supportedSMPServerVRange, + batchDelay = Nothing } data Request err msg = Request @@ -276,7 +279,7 @@ type TransportSession msg = (UserId, ProtoServer msg, Maybe EntityId) -- A single queue can be used for multiple 'SMPClient' instances, -- as 'SMPServerTransmission' includes server information. getProtocolClient :: forall err msg. Protocol err msg => TransportSession msg -> ProtocolClientConfig -> Maybe (TBQueue (ServerTransmission msg)) -> (ProtocolClient err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient err msg)) -getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, smpServerVRange} msgQ disconnected = do +getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, serverVRange, batchDelay} msgQ disconnected = do case chooseTransportHost networkConfig (host srv) of Right useHost -> (atomically (mkProtocolClient useHost) >>= runClient useTransport useHost) @@ -329,7 +332,7 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, client :: forall c. Transport c => TProxy c -> PClient err msg -> TMVar (Either (ProtocolClientError err) (ProtocolClient err msg)) -> c -> IO () client _ c cVar h = - runExceptT (protocolClientHandshake @err @msg h (keyHash srv) smpServerVRange) >>= \case + runExceptT (protocolClientHandshake @err @msg h (keyHash srv) serverVRange) >>= \case Left e -> atomically . putTMVar cVar . Left $ PCETransportError e Right th@THandle {sessionId, thVersion} -> do sessionTs <- getCurrentTime @@ -341,7 +344,7 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, `finally` disconnected c' send :: Transport c => ProtocolClient err msg -> THandle c -> IO () - send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= tPut h + send ProtocolClient {client_ = PClient {sndQ}} h = forever $ atomically (readTBQueue sndQ) >>= tPut h batchDelay receive :: Transport c => ProtocolClient err msg -> THandle c -> IO () receive ProtocolClient {client_ = PClient {rcvQ}} h = forever $ tGet h >>= atomically . writeTBQueue rcvQ diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index dad9d709a..a59424f24 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -17,14 +17,16 @@ import Control.Logger.Simple import Control.Monad.Except import Control.Monad.IO.Unlift import Control.Monad.Trans.Except -import Data.Bifunctor (first) +import Data.Bifunctor (first, bimap) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.List (find, partition) +import Data.Either (partitionEithers) +import Data.List (partition) import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M +import Data.Maybe (listToMaybe) import Data.Set (Set) import Data.Text.Encoding import Data.Tuple (swap) @@ -43,7 +45,6 @@ import UnliftIO (async) import UnliftIO.Exception (Exception) import qualified UnliftIO.Exception as E import UnliftIO.STM -import Data.Either (isLeft) type SMPClientVar = TMVar (Either SMPClientError SMPClient) @@ -51,8 +52,8 @@ data SMPClientAgentEvent = CAConnected SMPServer | CADisconnected SMPServer (Set SMPSub) | CAReconnected SMPServer - | CAResubscribed SMPServer SMPSub - | CASubError SMPServer SMPSub SMPClientError + | CAResubscribed SMPServer (NonEmpty SMPSub) + | CASubError SMPServer (NonEmpty (SMPSub, SMPClientError)) data SMPSubParty = SPRecipient | SPNotifier deriving (Eq, Ord, Show) @@ -208,45 +209,36 @@ getSMPServerClient' ca@SMPClientAgent {agentCfg, smpClients, msgQ} srv = reconnectClient :: ExceptT SMPClientError IO () reconnectClient = do withSMP ca srv $ \smp -> do - liftIO . notify $ CAReconnected srv + liftIO $ notify $ CAReconnected srv cs_ <- atomically $ mapM readTVar =<< TM.lookup srv (pendingSrvSubs ca) forM_ cs_ $ \cs -> do subs' <- filterM (fmap not . atomically . hasSub (srvSubs ca) srv . fst) $ M.assocs cs let (nSubs, rSubs) = partition (isNotifier . fst . fst) subs' - nRs <- liftIO $ subscribe_ smp SPNotifier nSubs - rRs <- liftIO $ subscribe_ smp SPRecipient rSubs - case find isLeft $ nRs <> rRs of - Just (Left e) -> throwE e - _ -> pure () + subscribe_ smp SPNotifier nSubs + subscribe_ smp SPRecipient rSubs where isNotifier = \case SPNotifier -> True SPRecipient -> False - subscribe_ :: SMPClient -> SMPSubParty -> [(SMPSub, C.APrivateSignKey)] -> IO [Either SMPClientError ()] + subscribe_ :: SMPClient -> SMPSubParty -> [(SMPSub, C.APrivateSignKey)] -> ExceptT SMPClientError IO () subscribe_ smp party subs = case L.nonEmpty subs of Just subs' -> do - let subs'' = L.map (first snd) subs' - rs <- L.zip subs'' <$> smpSubscribeQueues party ca smp srv subs'' - rs' <- forM rs $ \(sub, r) -> do - let sub' = first (party,) sub - s = fst sub' - case snd r of - Right () -> do - atomically $ addSubscription ca srv sub' - notify $ CAResubscribed srv s - pure $ Right () - Left e -> do - case e of - PCEResponseTimeout -> pure $ Left e - PCENetworkError -> pure $ Left e - _ -> do - notify $ CASubError srv s e - atomically $ removePendingSubscription ca srv s - pure $ Right () - pure $ L.toList rs' - Nothing -> pure [] + let subs'' :: (NonEmpty (QueueId, C.APrivateSignKey)) = L.map (first snd) subs' + rs <- liftIO $ smpSubscribeQueues party ca smp srv subs'' + let rs' :: (NonEmpty ((SMPSub, C.APrivateSignKey), Either SMPClientError ())) = + L.zipWith (first . const) subs' rs + rs'' :: [Either (SMPSub, SMPClientError) (SMPSub, C.APrivateSignKey)] = + map (\(sub, r) -> bimap (fst sub,) (const sub) r) $ L.toList rs' + (errs, oks) = partitionEithers rs'' + (tempErrs, finalErrs) = partition (temporaryClientError . snd) errs + mapM_ (atomically . addSubscription ca srv) oks + mapM_ (liftIO . notify . CAResubscribed srv) $ L.nonEmpty $ map fst oks + mapM_ (atomically . removePendingSubscription ca srv . fst) finalErrs + mapM_ (liftIO . notify . CASubError srv) $ L.nonEmpty finalErrs + mapM_ (throwE . snd) $ listToMaybe tempErrs + Nothing -> pure () notify :: SMPClientAgentEvent -> IO () notify evt = atomically $ writeTBQueue (agentQ ca) evt diff --git a/src/Simplex/Messaging/Notifications/Protocol.hs b/src/Simplex/Messaging/Notifications/Protocol.hs index 90e32bc38..34f93d8a7 100644 --- a/src/Simplex/Messaging/Notifications/Protocol.hs +++ b/src/Simplex/Messaging/Notifications/Protocol.hs @@ -429,7 +429,7 @@ data NtfSubStatus NSAuth | -- | SMP error other than AUTH NSErr ByteString - deriving (Eq, Show) + deriving (Eq, Ord, Show) ntfShouldSubscribe :: NtfSubStatus -> Bool ntfShouldSubscribe = \case diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 209975d5b..8c6c6bc03 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -16,16 +16,17 @@ import Control.Concurrent.STM (stateTVar) import Control.Logger.Simple import Control.Monad.Except import Control.Monad.Reader +import Data.Bifunctor (second) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B -import Data.Function (on) import Data.Functor (($>)) import Data.Int (Int64) -import Data.List (intercalate) +import Data.List (intercalate, sort) import Data.List.NonEmpty (NonEmpty(..)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M +import Data.Maybe (catMaybes) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) @@ -55,11 +56,10 @@ import System.Exit (exitFailure) import System.IO (BufferMode (..), hPutStrLn, hSetBuffering) import System.Mem.Weak (deRefWeak) import UnliftIO (IOMode (..), async, uninterruptibleCancel, withFile) -import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId, threadDelay) +import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId) import UnliftIO.Directory (doesFileExist, renameFile) import UnliftIO.Exception import UnliftIO.STM -import Data.Bifunctor (second) runNtfServer :: NtfServerConfig -> IO () runNtfServer cfg = do @@ -147,23 +147,28 @@ ntfServer cfg@NtfServerConfig {transports, logTLSErrors} started = do resubscribe :: NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> M () resubscribe NtfSubscriber {newSubQ} subs = do subs' <- atomically $ filterM (fmap ntfShouldSubscribe . readTVar . subStatus) $ M.elems subs - mapM_ (atomically . writeTBQueue newSubQ . L.map NtfSub) $ L.nonEmpty subs' - liftIO $ logInfo "SMP connections resubscribed" + atomically . writeTBQueue newSubQ $ map NtfSub subs' + liftIO $ logInfo $ "SMP resubscriptions queued (" <> tshow (length subs') <> " subscriptions)" ntfSubscriber :: NtfSubscriber -> M () ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = do raceAny_ [subscribe, receiveSMP, receiveAgent] where subscribe :: M () - subscribe = do - d <- asks $ resubscribeDelay . config - forever $ do - subs <- atomically (readTBQueue newSubQ) - let ss = L.groupBy ((==) `on` server) subs - forM_ ss $ \serverSubs -> do - SMPSubscriber {newSubQ = subscriberSubQ} <- getSMPSubscriber $ server $ L.head serverSubs - atomically $ writeTQueue subscriberSubQ serverSubs - when (length serverSubs > 10) $ threadDelay d + subscribe = forever $ do + subs <- atomically (readTBQueue newSubQ) + let ss = L.groupAllWith server subs + forM_ ss $ \serverSubs -> do + let srv = server $ L.head serverSubs + batches = toChunks 900 $ L.toList serverSubs + SMPSubscriber {newSubQ = subscriberSubQ} <- getSMPSubscriber srv + mapM_ (atomically . writeTQueue subscriberSubQ) batches + + toChunks :: Int -> [a] -> [NonEmpty a] + toChunks _ [] = [] + toChunks n xs = + let (ys, xs') = splitAt n xs + in maybe id (:) (L.nonEmpty ys) (toChunks n xs') server :: NtfEntityRec 'Subscription -> SMPServer server (NtfSub sub) = ntfSubServer sub @@ -184,21 +189,26 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge forever $ do subs <- atomically (peekTQueue subscriberSubQ) let subs' = L.map (\(NtfSub sub) -> sub) subs + srv = server $ L.head subs + logSubStatus srv "subscribing" $ length subs mapM_ (\NtfSubData {smpQueue} -> updateSubStatus smpQueue NSPending) subs' - rs <- liftIO $ subscribeQueues (server $ L.head subs) subs' - subs_ <- L.nonEmpty <$> foldM process [] rs + rs <- liftIO $ subscribeQueues srv subs' + (subs'', oks, errs) <- foldM process ([], 0, []) rs atomically $ do void $ readTQueue subscriberSubQ - mapM_ (writeTQueue subscriberSubQ . L.map NtfSub) subs_ + mapM_ (writeTQueue subscriberSubQ . L.map NtfSub) $ L.nonEmpty subs'' + logSubStatus srv "retrying" $ length subs'' + logSubStatus srv "subscribed" oks + logSubErrors srv errs where - process subs (sub@NtfSubData {smpQueue}, r) = case r of - Right _ -> updateSubStatus smpQueue NSActive $> subs - Left err -> do - handleSubError smpQueue err - pure $ case err of - PCEResponseTimeout -> sub : subs - PCENetworkError -> sub : subs - _ -> subs + process :: ([NtfSubData], Int, [NtfSubStatus]) -> (NtfSubData, Either SMPClientError ()) -> M ([NtfSubData], Int, [NtfSubStatus]) + process (subs, oks, errs) (sub@NtfSubData {smpQueue}, r) = case r of + Right _ -> updateSubStatus smpQueue NSActive $> (subs, oks + 1, errs) + Left e -> update <$> handleSubError smpQueue e + where + update = \case + Just err -> (subs, oks, err : errs) -- permanent error, log and don't retry subscription + Nothing -> (sub : subs, oks, errs) -- temporary error, retry subscription -- | Subscribe to queues. The list of results can have a different order. subscribeQueues :: SMPServer -> NonEmpty NtfSubData -> IO (NonEmpty (NtfSubData, Either SMPClientError ())) @@ -230,37 +240,43 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge atomically (readTBQueue agentQ) >>= \case CAConnected _ -> pure () CADisconnected srv subs -> do - logInfo $ "SMP server disconnected " <> showServer' srv <> " (" <> tshow (length subs) <> ") subscriptions" + logSubStatus srv "disconnected" $ length subs forM_ subs $ \(_, ntfId) -> do let smpQueue = SMPQueueNtf srv ntfId updateSubStatus smpQueue NSInactive CAReconnected srv -> logInfo $ "SMP server reconnected " <> showServer' srv - CAResubscribed srv sub -> do - let ntfId = snd sub - smpQueue = SMPQueueNtf srv ntfId - updateSubStatus smpQueue NSActive - CASubError srv (_, ntfId) err -> do - logError $ "SMP subscription error on server " <> showServer' srv <> ": " <> tshow err - handleSubError (SMPQueueNtf srv ntfId) err - where - showServer' = decodeLatin1 . strEncode . host + CAResubscribed srv subs -> do + forM_ subs $ \(_, ntfId) -> updateSubStatus (SMPQueueNtf srv ntfId) NSActive + logSubStatus srv "resubscribed" $ length subs + CASubError srv errs -> + forM errs (\((_, ntfId), err) -> handleSubError (SMPQueueNtf srv ntfId) err) + >>= logSubErrors srv . catMaybes . L.toList - handleSubError :: SMPQueueNtf -> SMPClientError -> M () + logSubStatus srv event n = when (n > 0) $ + logInfo $ "SMP server " <> event <> " " <> showServer' srv <> " (" <> tshow n <> " subscriptions)" + + logSubErrors :: SMPServer -> [NtfSubStatus] -> M () + logSubErrors srv errs = forM_ (L.group $ sort errs) $ \errs' -> do + logError $ "SMP subscription errors on server " <> showServer' srv <> ": " <> tshow (L.head errs') <> " (" <> tshow (length errs') <> " errors)" + + showServer' = decodeLatin1 . strEncode . host + + handleSubError :: SMPQueueNtf -> SMPClientError -> M (Maybe NtfSubStatus) handleSubError smpQueue = \case - PCEProtocolError AUTH -> updateSubStatus smpQueue NSAuth + PCEProtocolError AUTH -> updateSubStatus smpQueue NSAuth $> Just NSAuth PCEProtocolError e -> updateErr "SMP error " e - PCEIOError e -> updateErr "IOError " e PCEResponseError e -> updateErr "ResponseError " e PCEUnexpectedResponse r -> updateErr "UnexpectedResponse " r PCETransportError e -> updateErr "TransportError " e PCECryptoError e -> updateErr "CryptoError " e - PCEIncompatibleHost -> updateSubStatus smpQueue $ NSErr "IncompatibleHost" - PCEResponseTimeout -> pure () - PCENetworkError -> pure () + PCEIncompatibleHost -> let e = NSErr "IncompatibleHost" in updateSubStatus smpQueue e $> Just e + PCEResponseTimeout -> pure Nothing + PCENetworkError -> pure Nothing + PCEIOError _ -> pure Nothing where - updateErr :: Show e => ByteString -> e -> M () - updateErr errType e = updateSubStatus smpQueue . NSErr $ errType <> bshow e + updateErr :: Show e => ByteString -> e -> M (Maybe NtfSubStatus) + updateErr errType e = updateSubStatus smpQueue (NSErr $ errType <> bshow e) $> Just (NSErr errType) updateSubStatus smpQueue status = do st <- asks store @@ -354,7 +370,7 @@ receive th NtfServerClient {rcvQ, sndQ, activeAt} = forever $ do send :: Transport c => THandle c -> NtfServerClient -> IO () send h@THandle {thVersion = v} NtfServerClient {sndQ, sessionId, activeAt} = forever $ do t <- atomically $ readTBQueue sndQ - void . liftIO $ tPut h [(Nothing, encodeTransmission v sessionId t)] + void . liftIO $ tPut h Nothing [(Nothing, encodeTransmission v sessionId t)] atomically . writeTVar activeAt =<< liftIO getSystemTime -- instance Show a => Show (TVar a) where diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 5bff6df21..f9f3926c5 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -48,7 +48,6 @@ data NtfServerConfig = NtfServerConfig apnsConfig :: APNSPushClientConfig, inactiveClientExpiration :: Maybe ExpirationConfig, storeLogFile :: Maybe FilePath, - resubscribeDelay :: Int, -- microseconds -- CA certificate private key is not needed for initialization caCertificateFile :: FilePath, privateKeyFile :: FilePath, @@ -94,7 +93,7 @@ newNtfServerEnv config@NtfServerConfig {subQSize, pushQSize, smpAgentCfg, apnsCo data NtfSubscriber = NtfSubscriber { smpSubscribers :: TMap SMPServer SMPSubscriber, - newSubQ :: TBQueue (NonEmpty (NtfEntityRec 'Subscription)), + newSubQ :: TBQueue [NtfEntityRec 'Subscription], smpAgent :: SMPClientAgent } diff --git a/src/Simplex/Messaging/Notifications/Server/Main.hs b/src/Simplex/Messaging/Notifications/Server/Main.hs index 5eee65e71..efe6aeb2a 100644 --- a/src/Simplex/Messaging/Notifications/Server/Main.hs +++ b/src/Simplex/Messaging/Notifications/Server/Main.hs @@ -14,7 +14,8 @@ import Data.Maybe (fromMaybe) import qualified Data.Text as T import Network.Socket (HostName) import Options.Applicative -import Simplex.Messaging.Client.Agent (defaultSMPClientAgentConfig) +import Simplex.Messaging.Client (ProtocolClientConfig (..)) +import Simplex.Messaging.Client.Agent (SMPClientAgentConfig (..), defaultSMPClientAgentConfig) import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Server (runNtfServer) import Simplex.Messaging.Notifications.Server.Env (NtfServerConfig (..)) @@ -28,7 +29,10 @@ import System.IO (BufferMode (..), hSetBuffering, stderr, stdout) import Text.Read (readMaybe) ntfServerVersion :: String -ntfServerVersion = "1.4.1" +ntfServerVersion = "1.4.2" + +defaultSMPBatchDelay :: Int +defaultSMPBatchDelay = 10000 ntfServerCLI :: FilePath -> FilePath -> IO () ntfServerCLI cfgPath logPath = @@ -80,7 +84,9 @@ ntfServerCLI cfgPath logPath = <> ("host: " <> host <> "\n") <> ("port: " <> defaultServerPort <> "\n") <> "log_tls_errors: off\n\ - \websockets: off\n" + \# delay between command batches sent to SMP relays (microseconds), 0 to disable\n" + <> ("smp_batch_delay: " <> show defaultSMPBatchDelay <> "\n") + <> "websockets: off\n" runServer ini = do hSetBuffering stdout LineBuffering hSetBuffering stderr LineBuffering @@ -96,19 +102,20 @@ ntfServerCLI cfgPath logPath = enableStoreLog = settingIsOn "STORE_LOG" "enable" ini logStats = settingIsOn "STORE_LOG" "log_stats" ini c = combine cfgPath . ($ defaultX509Config) + smpBatchDelay = readIniDefault defaultSMPBatchDelay "TRANSPORT" "smp_batch_delay" ini + batchDelay = if smpBatchDelay <= 0 then Nothing else Just smpBatchDelay serverConfig = NtfServerConfig { transports = iniTransports ini, subIdBytes = 24, regCodeBytes = 32, - clientQSize = 16, - subQSize = 64, - pushQSize = 128, - smpAgentCfg = defaultSMPClientAgentConfig, + clientQSize = 64, + subQSize = 512, + pushQSize = 1048, + smpAgentCfg = defaultSMPClientAgentConfig {smpCfg = (smpCfg defaultSMPClientAgentConfig) {batchDelay}}, apnsConfig = defaultAPNSPushClientConfig, inactiveClientExpiration = Nothing, storeLogFile = enableStoreLog $> storeLogFilePath, - resubscribeDelay = 50000, -- 50ms caCertificateFile = c caCrtFile, privateKeyFile = c serverKeyFile, certificateFile = c serverCrtFile, diff --git a/src/Simplex/Messaging/Protocol.hs b/src/Simplex/Messaging/Protocol.hs index 31245e887..31bf0ee4c 100644 --- a/src/Simplex/Messaging/Protocol.hs +++ b/src/Simplex/Messaging/Protocol.hs @@ -146,6 +146,7 @@ module Simplex.Messaging.Protocol where import Control.Applicative (optional, (<|>)) +import Control.Concurrent (threadDelay) import Control.Monad.Except import Data.Aeson (FromJSON (..), ToJSON (..)) import qualified Data.Aeson as J @@ -1244,8 +1245,8 @@ instance Encoding CommandError where _ -> fail "bad command error type" -- | Send signed SMP transmission to TCP transport. -tPut :: Transport c => THandle c -> NonEmpty SentRawTransmission -> IO [Either TransportError ()] -tPut th trs +tPut :: Transport c => THandle c -> Maybe Int -> NonEmpty SentRawTransmission -> IO [Either TransportError ()] +tPut th delay_ trs | batch th = tPutBatch [] $ L.map tEncode trs | otherwise = forM (L.toList trs) $ tPutLog . tEncode where @@ -1255,7 +1256,7 @@ tPut th trs r <- if n == 0 then largeMsg else replicate n <$> tPutLog (tEncodeBatch n s) let rs' = rs <> r case ts_ of - Just ts' -> tPutBatch rs' ts' + Just ts' -> mapM_ threadDelay delay_ >> tPutBatch rs' ts' _ -> pure rs' largeMsg = putStrLn "tPut error: large message" >> pure [Left TELargeMsg] tPutLog s = do diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index c02d6927d..e8d664799 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -281,7 +281,7 @@ receive th Client {rcvQ, sndQ, activeAt} = forever $ do send :: Transport c => THandle c -> Client -> IO () send h@THandle {thVersion = v} Client {sndQ, sessionId, activeAt} = forever $ do ts <- atomically $ L.sortWith tOrder <$> readTBQueue sndQ - void . liftIO . tPut h $ L.map ((Nothing,) . encodeTransmission v sessionId) ts + void . liftIO . tPut h Nothing $ L.map ((Nothing,) . encodeTransmission v sessionId) ts atomically . writeTVar activeAt =<< liftIO getSystemTime where tOrder :: Transmission BrokerMsg -> Int diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 80f9bc18a..004f72971 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -164,7 +164,7 @@ smpServerCLI cfgPath logPath = serverConfig = ServerConfig { transports = iniTransports ini, - tbqSize = 32, + tbqSize = 64, serverTbqSize = 1024, msgQueueQuota = 128, queueIdBytes = 24, diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index 9db3901aa..ec3678b39 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -18,6 +18,7 @@ import qualified Data.Text as T import Data.Text.Encoding (decodeUtf8With) import Data.Time (NominalDiffTime) import UnliftIO.Async +import Data.List (groupBy, sortOn) raceAny_ :: MonadUnliftIO m => [m a] -> m () raceAny_ = r [] @@ -102,6 +103,17 @@ eitherToMaybe :: Either a b -> Maybe b eitherToMaybe = either (const Nothing) Just {-# INLINE eitherToMaybe #-} +groupOn :: Eq k => (a -> k) -> [a] -> [[a]] +groupOn = groupBy . eqOn + -- it is equivalent to groupBy ((==) `on` f), + -- but it redefines `on` to avoid duplicate computation for most values. + -- source: https://hackage.haskell.org/package/extra-1.7.13/docs/src/Data.List.Extra.html#groupOn + -- the on2 in this package is specialized to only use `==` as the function, `eqOn f` is equivalent to `(==) `on` f` + where eqOn f = \x -> let fx = f x in \y -> fx == f y + +groupAllOn :: Ord k => (a -> k) -> [a] -> [[a]] +groupAllOn f = groupOn f . sortOn f + safeDecodeUtf8 :: ByteString -> Text safeDecodeUtf8 = decodeUtf8With onError where diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index e33c52a09..3d30f6548 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -105,7 +105,7 @@ pattern Msg :: MsgBody -> ACommand 'Agent e pattern Msg msgBody <- MSG MsgMeta {integrity = MsgOk} _ msgBody smpCfgV1 :: ProtocolClientConfig -smpCfgV1 = (smpCfg agentCfg) {smpServerVRange = vr11} +smpCfgV1 = (smpCfg agentCfg) {serverVRange = vr11} agentCfgV1 :: AgentConfig agentCfgV1 = agentCfg {smpAgentVRange = vr11, smpClientVRange = vr11, e2eEncryptVRange = vr11, smpCfg = smpCfgV1} @@ -1381,7 +1381,7 @@ testCreateQueueAuth clnt1 clnt2 = do where getClient (clntAuth, clntVersion) = let servers = initAgentServers {smp = userServers [ProtoServerWithAuth testSMPServer clntAuth]} - smpCfg = (defaultClientConfig :: ProtocolClientConfig) {smpServerVRange = mkVersionRange 4 clntVersion} + smpCfg = (defaultClientConfig :: ProtocolClientConfig) {serverVRange = mkVersionRange 4 clntVersion} in getSMPAgentClient' agentCfg {smpCfg} servers testDB testSMPServerConnectionTest :: ATransport -> Maybe BasicAuth -> SMPServerWithAuth -> IO (Maybe ProtocolTestFailure) diff --git a/tests/NtfClient.hs b/tests/NtfClient.hs index f0a9a1d47..3984ff881 100644 --- a/tests/NtfClient.hs +++ b/tests/NtfClient.hs @@ -90,7 +90,6 @@ ntfServerCfg = }, inactiveClientExpiration = Just defaultInactiveClientExpiration, storeLogFile = Nothing, - resubscribeDelay = 1000, -- CA certificate private key is not needed for initialization caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", @@ -134,7 +133,7 @@ ntfServerTest _ t = runNtfTest $ \h -> tPut' h t >> tGet' h where tPut' h (sig, corrId, queueId, smp) = do let t' = smpEncode (sessionId (h :: THandle c), corrId, queueId, smp) - [Right ()] <- tPut h [(sig, t')] + [Right ()] <- tPut h Nothing [(sig, t')] pure () tGet' h = do [(Nothing, _, (CorrId corrId, qId, Right cmd))] <- tGet h diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 7d9113fdd..3e60b9061 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -159,7 +159,7 @@ smpServerTest _ t = runSmpTest $ \h -> tPut' h t >> tGet' h where tPut' h (sig, corrId, queueId, smp) = do let t' = smpEncode (sessionId (h :: THandle c), corrId, queueId, smp) - [Right ()] <- tPut h [(sig, t')] + [Right ()] <- tPut h Nothing [(sig, t')] pure () tGet' h = do [(Nothing, _, (CorrId corrId, qId, Right cmd))] <- tGet h diff --git a/tests/ServerTests.hs b/tests/ServerTests.hs index 581e18166..74b24bbc3 100644 --- a/tests/ServerTests.hs +++ b/tests/ServerTests.hs @@ -88,7 +88,7 @@ signSendRecv h@THandle {thVersion, sessionId} pk (corrId, qId, cmd) = do tPut1 :: Transport c => THandle c -> SentRawTransmission -> IO (Either TransportError ()) tPut1 h t = do - [r] <- tPut h [t] + [r] <- tPut h Nothing [t] pure r tGet1 :: (ProtocolEncoding err cmd, Transport c, MonadIO m, MonadFail m) => THandle c -> m (SignedTransmission err cmd)