From bdff274f56256f04344480e2f1933e3c2f6c2756 Mon Sep 17 00:00:00 2001 From: spaced4ndy <8711996+spaced4ndy@users.noreply.github.com> Date: Tue, 4 Apr 2023 14:51:13 +0400 Subject: [PATCH] use Int64 for time calculations (#706) --- src/Simplex/FileTransfer/Agent.hs | 2 +- src/Simplex/FileTransfer/Server.hs | 9 ++++--- src/Simplex/FileTransfer/Server/Env.hs | 4 +-- src/Simplex/FileTransfer/Types.hs | 2 +- src/Simplex/Messaging/Agent.hs | 5 ++-- src/Simplex/Messaging/Agent/Env/SQLite.hs | 5 ++-- .../Messaging/Agent/NtfSubSupervisor.hs | 13 +++------ src/Simplex/Messaging/Agent/RetryInterval.hs | 27 ++++++++++--------- src/Simplex/Messaging/Agent/Store/SQLite.hs | 8 +++--- src/Simplex/Messaging/Client.hs | 7 +++-- src/Simplex/Messaging/Notifications/Server.hs | 9 ++++--- .../Messaging/Notifications/Server/Env.hs | 5 ++-- src/Simplex/Messaging/Server.hs | 11 ++++---- src/Simplex/Messaging/Server/Env/STM.hs | 5 ++-- src/Simplex/Messaging/Server/Expiration.hs | 2 +- src/Simplex/Messaging/Util.hs | 26 ++++++++++++++++++ 16 files changed, 83 insertions(+), 57 deletions(-) diff --git a/src/Simplex/FileTransfer/Agent.hs b/src/Simplex/FileTransfer/Agent.hs index ab4beffb9..eb5a7fa41 100644 --- a/src/Simplex/FileTransfer/Agent.hs +++ b/src/Simplex/FileTransfer/Agent.hs @@ -153,7 +153,7 @@ runXFTPWorker c srv doWork = do downloadFileChunk fc replica `catchError` retryOnError delay' loop (workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) . show) where - retryOnError :: Int -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m () + retryOnError :: Int64 -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m () retryOnError replicaDelay loop done e = do logError $ "XFTP worker error: " <> tshow e if temporaryAgentError e diff --git a/src/Simplex/FileTransfer/Server.hs b/src/Simplex/FileTransfer/Server.hs index f43fcb7c4..7b94738b0 100644 --- a/src/Simplex/FileTransfer/Server.hs +++ b/src/Simplex/FileTransfer/Server.hs @@ -22,6 +22,7 @@ import Data.ByteString.Builder (byteString) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) +import Data.Int (Int64) import Data.List (intercalate) import Data.List.NonEmpty (NonEmpty) import qualified Data.List.NonEmpty as L @@ -96,7 +97,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, logTLSErrors} started = do st <- asks store let interval = checkInterval expCfg * 1000000 forever $ do - threadDelay interval + liftIO $ threadDelay' interval old <- liftIO $ expireBeforeEpoch expCfg sIds <- M.keysSet <$> readTVarIO (files st) forM_ sIds $ \sId -> do @@ -118,11 +119,11 @@ xftpServer cfg@XFTPServerConfig {xftpPort, logTLSErrors} started = do [logServerStats logStatsStartTime interval serverStatsLogFile] serverStatsThread_ _ = [] - logServerStats :: Int -> Int -> FilePath -> M () + logServerStats :: Int64 -> Int64 -> FilePath -> M () logServerStats startAt logInterval statsFilePath = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath - threadDelay $ 1_000_000 * (initialDelay + if initialDelay < 0 then 86_400 else 0) + liftIO $ threadDelay' $ 1_000_000 * (initialDelay + if initialDelay < 0 then 86_400 else 0) FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesDeleted, filesDownloaded, fileDownloads, fileDownloadAcks, filesCount, filesSize} <- asks serverStats let interval = 1_000_000 * logInterval forever $ do @@ -155,7 +156,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, logTLSErrors} started = do show filesCount', show filesSize' ] - threadDelay interval + liftIO $ threadDelay' interval data ServerFile = ServerFile { filePath :: FilePath, diff --git a/src/Simplex/FileTransfer/Server/Env.hs b/src/Simplex/FileTransfer/Server/Env.hs index 08ddf9ade..7f5298e9c 100644 --- a/src/Simplex/FileTransfer/Server/Env.hs +++ b/src/Simplex/FileTransfer/Server/Env.hs @@ -51,8 +51,8 @@ data XFTPServerConfig = XFTPServerConfig privateKeyFile :: FilePath, certificateFile :: FilePath, -- stats config - see SMP server config - logStatsInterval :: Maybe Int, - logStatsStartTime :: Int, + logStatsInterval :: Maybe Int64, + logStatsStartTime :: Int64, serverStatsLogFile :: FilePath, serverStatsBackupFile :: Maybe FilePath, logTLSErrors :: Bool diff --git a/src/Simplex/FileTransfer/Types.hs b/src/Simplex/FileTransfer/Types.hs index 578d98ac6..b70d9c1c7 100644 --- a/src/Simplex/FileTransfer/Types.hs +++ b/src/Simplex/FileTransfer/Types.hs @@ -101,7 +101,7 @@ data RcvFileChunkReplica = RcvFileChunkReplica replicaId :: ChunkReplicaId, replicaKey :: C.APrivateSignKey, received :: Bool, - delay :: Maybe Int, + delay :: Maybe Int64, retries :: Int } deriving (Eq, Show) diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index 4231fc9d9..b33fad818 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -1612,7 +1612,8 @@ subscriber c@AgentClient {msgQ} = forever $ do cleanupManager :: forall m. AgentMonad' m => AgentClient -> m () cleanupManager c@AgentClient {subQ} = do - threadDelay =<< asks (initialCleanupDelay . config) + delay <- asks (initialCleanupDelay . config) + liftIO $ threadDelay' delay int <- asks (cleanupInterval . config) forever $ do void . runExceptT $ do @@ -1620,7 +1621,7 @@ cleanupManager c@AgentClient {subQ} = do deleteRcvFilesExpired `catchError` (notify "" . RFERR) deleteRcvFilesDeleted `catchError` (notify "" . RFERR) deleteRcvFilesTmpPaths `catchError` (notify "" . RFERR) - threadDelay int + liftIO $ threadDelay' int where deleteConns = withLock (deleteLock c) "cleanupManager" $ do diff --git a/src/Simplex/Messaging/Agent/Env/SQLite.hs b/src/Simplex/Messaging/Agent/Env/SQLite.hs index 87a949322..7cb7509f0 100644 --- a/src/Simplex/Messaging/Agent/Env/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Env/SQLite.hs @@ -30,6 +30,7 @@ import Control.Monad.Except import Control.Monad.IO.Unlift import Control.Monad.Reader import Crypto.Random +import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty) import Data.Map (Map) import Data.Time.Clock (NominalDiffTime, nominalDay) @@ -79,8 +80,8 @@ data AgentConfig = AgentConfig messageRetryInterval :: RetryInterval2, messageTimeout :: NominalDiffTime, helloTimeout :: NominalDiffTime, - initialCleanupDelay :: Int, - cleanupInterval :: Int, + initialCleanupDelay :: Int64, + cleanupInterval :: Int64, rcvFilesTTL :: NominalDiffTime, xftpNotifyErrsOnRetry :: Bool, deleteErrorCount :: Int, diff --git a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs index 8ba0ea863..e16c3d9e6 100644 --- a/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs +++ b/src/Simplex/Messaging/Agent/NtfSubSupervisor.hs @@ -23,10 +23,9 @@ import Control.Monad import Control.Monad.Except import Control.Monad.Reader import Data.Bifunctor (first) -import Data.Fixed (Fixed (MkFixed), Pico) import qualified Data.Map.Strict as M import Data.Text (Text) -import Data.Time (UTCTime, addUTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds) +import Data.Time (UTCTime, addUTCTime, getCurrentTime) import Simplex.Messaging.Agent.Client import Simplex.Messaging.Agent.Env.SQLite import Simplex.Messaging.Agent.Protocol (ACommand (..), APartyCmd (..), AgentErrorType (..), BrokerErrorType (..), ConnId, NotificationsMode (..), SAEntity (..)) @@ -40,7 +39,7 @@ import Simplex.Messaging.Notifications.Types import Simplex.Messaging.Protocol (NtfServer, ProtocolServer, SMPServer, sameSrvAddr) import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Util (tshow, unlessM) +import Simplex.Messaging.Util (diffInMicros, threadDelay', tshow, unlessM) import System.Random (randomR) import UnliftIO import UnliftIO.Concurrent (forkIO, threadDelay) @@ -292,16 +291,10 @@ rescheduleAction doWork ts actionTs | otherwise = do void . atomically $ tryTakeTMVar doWork void . forkIO $ do - threadDelay $ diffInMicros actionTs ts + liftIO $ threadDelay' $ diffInMicros actionTs ts void . atomically $ tryPutTMVar doWork () pure True -fromPico :: Pico -> Integer -fromPico (MkFixed i) = i - -diffInMicros :: UTCTime -> UTCTime -> Int -diffInMicros a b = (`div` 1000000) . fromInteger . fromPico . nominalDiffTimeToSeconds $ diffUTCTime a b - retryOnError :: AgentMonad' m => AgentClient -> Text -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m () retryOnError c name loop done e = do logError $ name <> " error: " <> tshow e diff --git a/src/Simplex/Messaging/Agent/RetryInterval.hs b/src/Simplex/Messaging/Agent/RetryInterval.hs index 1ad820a53..7f88592fc 100644 --- a/src/Simplex/Messaging/Agent/RetryInterval.hs +++ b/src/Simplex/Messaging/Agent/RetryInterval.hs @@ -14,16 +14,17 @@ module Simplex.Messaging.Agent.RetryInterval ) where -import Control.Concurrent (forkIO, threadDelay) +import Control.Concurrent (forkIO) import Control.Monad (void) import Control.Monad.IO.Class (MonadIO, liftIO) -import Simplex.Messaging.Util (whenM) +import Data.Int (Int64) +import Simplex.Messaging.Util (threadDelay', whenM) import UnliftIO.STM data RetryInterval = RetryInterval - { initialInterval :: Int, - increaseAfter :: Int, - maxInterval :: Int + { initialInterval :: Int64, + increaseAfter :: Int64, + maxInterval :: Int64 } data RetryInterval2 = RetryInterval2 @@ -32,8 +33,8 @@ data RetryInterval2 = RetryInterval2 } data RI2State = RI2State - { slowInterval :: Int, - fastInterval :: Int + { slowInterval :: Int64, + fastInterval :: Int64 } deriving (Show) @@ -47,14 +48,14 @@ updateRetryInterval2 RI2State {slowInterval, fastInterval} RetryInterval2 {riSlo data RetryIntervalMode = RISlow | RIFast deriving (Eq, Show) -withRetryInterval :: forall m. MonadIO m => RetryInterval -> (Int -> m () -> m ()) -> m () +withRetryInterval :: forall m. MonadIO m => RetryInterval -> (Int64 -> m () -> m ()) -> m () withRetryInterval ri action = callAction 0 $ initialInterval ri where - callAction :: Int -> Int -> m () + callAction :: Int64 -> Int64 -> m () callAction elapsed delay = action delay loop where loop = do - liftIO $ threadDelay delay + liftIO $ threadDelay' delay let elapsed' = elapsed + delay callAction elapsed' $ nextDelay elapsed' delay ri @@ -63,7 +64,7 @@ withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> (RI2State withRetryLock2 RetryInterval2 {riSlow, riFast} lock action = callAction (0, initialInterval riSlow) (0, initialInterval riFast) where - callAction :: (Int, Int) -> (Int, Int) -> m () + callAction :: (Int64, Int64) -> (Int64, Int64) -> m () callAction slow fast = action (RI2State (snd slow) (snd fast)) loop where loop = \case @@ -77,13 +78,13 @@ withRetryLock2 RetryInterval2 {riSlow, riFast} lock action = wait delay = do waiting <- newTVarIO True _ <- liftIO . forkIO $ do - threadDelay delay + threadDelay' delay atomically $ whenM (readTVar waiting) $ void $ tryPutTMVar lock () atomically $ do takeTMVar lock writeTVar waiting False -nextDelay :: Int -> Int -> RetryInterval -> Int +nextDelay :: Int64 -> Int64 -> RetryInterval -> Int64 nextDelay elapsed delay RetryInterval {increaseAfter, maxInterval} = if elapsed < increaseAfter || delay == maxInterval then delay diff --git a/src/Simplex/Messaging/Agent/Store/SQLite.hs b/src/Simplex/Messaging/Agent/Store/SQLite.hs index e6a4b8cd4..99e87fc3f 100644 --- a/src/Simplex/Messaging/Agent/Store/SQLite.hs +++ b/src/Simplex/Messaging/Agent/Store/SQLite.hs @@ -845,7 +845,7 @@ getPendingMsgData db connId msgId = do WHERE m.conn_id = ? AND m.internal_id = ? |] (connId, msgId) - pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs, Maybe Int, Maybe Int) -> PendingMsgData + pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs, Maybe Int64, Maybe Int64) -> PendingMsgData pendingMsgData (msgType, msgFlags_, msgBody, internalTs, riSlow_, riFast_) = let msgFlags = fromMaybe SMP.noMsgFlags msgFlags_ msgRetryState = RI2State <$> riSlow_ <*> riFast_ @@ -1921,12 +1921,12 @@ getRcvFile db rcvFileId = runExceptT $ do |] (Only chunkId) where - toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica + toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int64, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries, host, port, keyHash) = let server = XFTPServer host port keyHash in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries} -updateRcvChunkReplicaDelay :: DB.Connection -> Int64 -> Int -> IO () +updateRcvChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO () updateRcvChunkReplicaDelay db replicaId delay = do updatedAt <- getCurrentTime DB.execute db "UPDATE rcv_file_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (delay, updatedAt, replicaId) @@ -1989,7 +1989,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d |] (host, port, keyHash, RFSReceiving, cutoffTs) where - toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int, Int)) -> RcvFileChunk + toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int64, Int)) -> RcvFileChunk toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries)) = RcvFileChunk { rcvFileId, diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 7df7cf97d..02fcbfc2d 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -70,7 +70,6 @@ module Simplex.Messaging.Client ) where -import Control.Concurrent (threadDelay) import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception @@ -102,7 +101,7 @@ import Simplex.Messaging.Transport import Simplex.Messaging.Transport.Client (SocksProxy, TransportClientConfig (..), TransportHost (..), runTransportClient) import Simplex.Messaging.Transport.KeepAlive import Simplex.Messaging.Transport.WebSockets (WS) -import Simplex.Messaging.Util (bshow, raceAny_) +import Simplex.Messaging.Util (bshow, raceAny_, threadDelay') import Simplex.Messaging.Version import System.Timeout (timeout) @@ -171,7 +170,7 @@ data NetworkConfig = NetworkConfig -- | TCP keep-alive options, Nothing to skip enabling keep-alive tcpKeepAlive :: Maybe KeepAliveOpts, -- | period for SMP ping commands (microseconds, 0 to disable) - smpPingInterval :: Int, + smpPingInterval :: Int64, -- | the count of PING errors after which SMP client terminates (and will be reconnected), 0 to disable smpPingCount :: Int, logTLSErrors :: Bool @@ -348,7 +347,7 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, ping :: ProtocolClient err msg -> IO () ping c@ProtocolClient {client_ = PClient {pingErrorCount}} = do - threadDelay smpPingInterval + threadDelay' smpPingInterval runExceptT (sendProtocolCommand c Nothing "" $ protocolPing @err @msg) >>= \case Left PCEResponseTimeout -> do cnt <- atomically $ stateTVar pingErrorCount $ \cnt -> (cnt + 1, cnt + 1) diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 1f230883b..ee5b3f508 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -19,6 +19,7 @@ import Control.Monad.Reader import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Functor (($>)) +import Data.Int (Int64) import Data.List (intercalate) import Data.Map.Strict (Map) import qualified Data.Text as T @@ -97,11 +98,11 @@ ntfServer cfg@NtfServerConfig {transports, logTLSErrors} started = do [logServerStats logStatsStartTime interval serverStatsLogFile] serverStatsThread_ _ = [] - logServerStats :: Int -> Int -> FilePath -> M () + logServerStats :: Int64 -> Int64 -> FilePath -> M () logServerStats startAt logInterval statsFilePath = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath - threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) + liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) NtfServerStats {fromTime, tknCreated, tknVerified, tknDeleted, subCreated, subDeleted, ntfReceived, ntfDelivered, activeTokens, activeSubs} <- asks serverStats let interval = 1000000 * logInterval withFile statsFilePath AppendMode $ \h -> liftIO $ do @@ -136,7 +137,7 @@ ntfServer cfg@NtfServerConfig {transports, logTLSErrors} started = do weekCount sub, monthCount sub ] - threadDelay interval + threadDelay' interval resubscribe :: NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> M () resubscribe NtfSubscriber {newSubQ} subs = do @@ -494,7 +495,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu atomically $ TM.insert tknId notifier intervalNotifiers where intervalNotifier delay = forever $ do - threadDelay delay + liftIO $ threadDelay' delay atomically $ writeTBQueue pushQ (tkn, PNCheckMessages) NtfReqNew corrId (ANE SSubscription newSub) -> do logDebug "SNEW - new subscription" diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 74eedcb17..3736dffb1 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -11,6 +11,7 @@ import Control.Concurrent.Async (Async) import Control.Monad.IO.Unlift import Crypto.Random import Data.ByteString.Char8 (ByteString) +import Data.Int (Int64) import Data.Time.Clock (getCurrentTime) import Data.Time.Clock.System (SystemTime) import Data.Word (Word16) @@ -52,8 +53,8 @@ data NtfServerConfig = NtfServerConfig privateKeyFile :: FilePath, certificateFile :: FilePath, -- stats config - see SMP server config - logStatsInterval :: Maybe Int, - logStatsStartTime :: Int, + logStatsInterval :: Maybe Int64, + logStatsStartTime :: Int64, serverStatsLogFile :: FilePath, serverStatsBackupFile :: Maybe FilePath, logTLSErrors :: Bool diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index b9f03ff9b..0854506b6 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -46,6 +46,7 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import Data.Either (fromRight, partitionEithers) import Data.Functor (($>)) +import Data.Int (Int64) import Data.List (intercalate) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M @@ -157,7 +158,7 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do quota <- asks $ msgQueueQuota . config let interval = checkInterval expCfg * 1000000 forever $ do - threadDelay interval + liftIO $ threadDelay' interval old <- liftIO $ expireBeforeEpoch expCfg rIds <- M.keysSet <$> readTVarIO ms forM_ rIds $ \rId -> @@ -169,11 +170,11 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do [logServerStats logStatsStartTime interval serverStatsLogFile] serverStatsThread_ _ = [] - logServerStats :: Int -> Int -> FilePath -> M () + logServerStats :: Int64 -> Int64 -> FilePath -> M () logServerStats startAt logInterval statsFilePath = do initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath - threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) + liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats let interval = 1000000 * logInterval withFile statsFilePath AppendMode $ \h -> liftIO $ do @@ -212,7 +213,7 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do show qCount', show msgCount' ] - threadDelay interval + threadDelay' interval runClient :: Transport c => TProxy c -> c -> M () runClient _ h = do @@ -292,7 +293,7 @@ disconnectTransport :: Transport c => THandle c -> client -> (client -> TVar Sys disconnectTransport THandle {connection} c activeAt expCfg = do let interval = checkInterval expCfg * 1000000 forever . liftIO $ do - threadDelay interval + threadDelay' interval old <- expireBeforeEpoch expCfg ts <- readTVarIO $ activeAt c when (systemSeconds ts < old) $ closeConnection connection diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 15253fec8..f0993fda0 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -9,6 +9,7 @@ import Control.Concurrent (ThreadId) import Control.Monad.IO.Unlift import Crypto.Random import Data.ByteString.Char8 (ByteString) +import Data.Int (Int64) import Data.List.NonEmpty (NonEmpty) import Data.Map.Strict (Map) import qualified Data.Map.Strict as M @@ -54,10 +55,10 @@ data ServerConfig = ServerConfig -- and check interval, seconds inactiveClientExpiration :: Maybe ExpirationConfig, -- | log SMP server usage statistics, only aggregates are logged, seconds - logStatsInterval :: Maybe Int, + logStatsInterval :: Maybe Int64, -- | time of the day when the stats are logged first, to log at consistent times, -- irrespective of when the server is started (seconds from 00:00 UTC) - logStatsStartTime :: Int, + logStatsStartTime :: Int64, -- | file to log stats serverStatsLogFile :: FilePath, -- | file to save and restore stats diff --git a/src/Simplex/Messaging/Server/Expiration.hs b/src/Simplex/Messaging/Server/Expiration.hs index 8f369d516..ae52d60fe 100644 --- a/src/Simplex/Messaging/Server/Expiration.hs +++ b/src/Simplex/Messaging/Server/Expiration.hs @@ -10,7 +10,7 @@ data ExpirationConfig = ExpirationConfig { -- time after which the entity can be expired, seconds ttl :: Int64, -- interval to check expiration, seconds - checkInterval :: Int + checkInterval :: Int64 } expireBeforeEpoch :: ExpirationConfig -> IO Int64 diff --git a/src/Simplex/Messaging/Util.hs b/src/Simplex/Messaging/Util.hs index a98175ff0..ef9e3ad74 100644 --- a/src/Simplex/Messaging/Util.hs +++ b/src/Simplex/Messaging/Util.hs @@ -1,8 +1,10 @@ +{-# LANGUAGE NumericUnderscores #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE ScopedTypeVariables #-} module Simplex.Messaging.Util where +import Control.Concurrent (threadDelay) import qualified Control.Exception as E import Control.Monad.Except import Control.Monad.IO.Unlift @@ -10,9 +12,13 @@ import Control.Monad.Trans.Except import Data.Bifunctor (first) import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B +import Data.Fixed (Fixed (MkFixed), Pico) +import Data.Int (Int64) import Data.Text (Text) import qualified Data.Text as T import Data.Text.Encoding (decodeUtf8With) +import Data.Time (nominalDiffTimeToSeconds) +import Data.Time.Clock (UTCTime, diffUTCTime) import UnliftIO.Async raceAny_ :: MonadUnliftIO m => [m a] -> m () @@ -102,3 +108,23 @@ safeDecodeUtf8 :: ByteString -> Text safeDecodeUtf8 = decodeUtf8With onError where onError _ _ = Just '?' + +threadDelay' :: Int64 -> IO () +threadDelay' time + | time <= 0 = pure () +threadDelay' time = do + let maxWait = min time $ fromIntegral (maxBound :: Int) + threadDelay $ fromIntegral maxWait + when (maxWait /= time) $ threadDelay' (time - maxWait) + +diffInSeconds :: UTCTime -> UTCTime -> Int64 +diffInSeconds a b = (`div` 1000000_000000) $ diffInPicos a b + +diffInMicros :: UTCTime -> UTCTime -> Int64 +diffInMicros a b = (`div` 1000000) $ diffInPicos a b + +diffInPicos :: UTCTime -> UTCTime -> Int64 +diffInPicos a b = fromInteger . fromPico . nominalDiffTimeToSeconds $ diffUTCTime a b + +fromPico :: Pico -> Integer +fromPico (MkFixed i) = i