mirror of
https://github.com/simplex-chat/simplexmq.git
synced 2026-05-25 14:14:54 +00:00
use Int64 for time calculations (#706)
This commit is contained in:
@@ -153,7 +153,7 @@ runXFTPWorker c srv doWork = do
|
||||
downloadFileChunk fc replica
|
||||
`catchError` retryOnError delay' loop (workerInternalError c rcvFileId rcvFileEntityId (Just fileTmpPath) . show)
|
||||
where
|
||||
retryOnError :: Int -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m ()
|
||||
retryOnError :: Int64 -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m ()
|
||||
retryOnError replicaDelay loop done e = do
|
||||
logError $ "XFTP worker error: " <> tshow e
|
||||
if temporaryAgentError e
|
||||
|
||||
@@ -22,6 +22,7 @@ import Data.ByteString.Builder (byteString)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (intercalate)
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
@@ -96,7 +97,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, logTLSErrors} started = do
|
||||
st <- asks store
|
||||
let interval = checkInterval expCfg * 1000000
|
||||
forever $ do
|
||||
threadDelay interval
|
||||
liftIO $ threadDelay' interval
|
||||
old <- liftIO $ expireBeforeEpoch expCfg
|
||||
sIds <- M.keysSet <$> readTVarIO (files st)
|
||||
forM_ sIds $ \sId -> do
|
||||
@@ -118,11 +119,11 @@ xftpServer cfg@XFTPServerConfig {xftpPort, logTLSErrors} started = do
|
||||
[logServerStats logStatsStartTime interval serverStatsLogFile]
|
||||
serverStatsThread_ _ = []
|
||||
|
||||
logServerStats :: Int -> Int -> FilePath -> M ()
|
||||
logServerStats :: Int64 -> Int64 -> FilePath -> M ()
|
||||
logServerStats startAt logInterval statsFilePath = do
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
|
||||
threadDelay $ 1_000_000 * (initialDelay + if initialDelay < 0 then 86_400 else 0)
|
||||
liftIO $ threadDelay' $ 1_000_000 * (initialDelay + if initialDelay < 0 then 86_400 else 0)
|
||||
FileServerStats {fromTime, filesCreated, fileRecipients, filesUploaded, filesDeleted, filesDownloaded, fileDownloads, fileDownloadAcks, filesCount, filesSize} <- asks serverStats
|
||||
let interval = 1_000_000 * logInterval
|
||||
forever $ do
|
||||
@@ -155,7 +156,7 @@ xftpServer cfg@XFTPServerConfig {xftpPort, logTLSErrors} started = do
|
||||
show filesCount',
|
||||
show filesSize'
|
||||
]
|
||||
threadDelay interval
|
||||
liftIO $ threadDelay' interval
|
||||
|
||||
data ServerFile = ServerFile
|
||||
{ filePath :: FilePath,
|
||||
|
||||
@@ -51,8 +51,8 @@ data XFTPServerConfig = XFTPServerConfig
|
||||
privateKeyFile :: FilePath,
|
||||
certificateFile :: FilePath,
|
||||
-- stats config - see SMP server config
|
||||
logStatsInterval :: Maybe Int,
|
||||
logStatsStartTime :: Int,
|
||||
logStatsInterval :: Maybe Int64,
|
||||
logStatsStartTime :: Int64,
|
||||
serverStatsLogFile :: FilePath,
|
||||
serverStatsBackupFile :: Maybe FilePath,
|
||||
logTLSErrors :: Bool
|
||||
|
||||
@@ -101,7 +101,7 @@ data RcvFileChunkReplica = RcvFileChunkReplica
|
||||
replicaId :: ChunkReplicaId,
|
||||
replicaKey :: C.APrivateSignKey,
|
||||
received :: Bool,
|
||||
delay :: Maybe Int,
|
||||
delay :: Maybe Int64,
|
||||
retries :: Int
|
||||
}
|
||||
deriving (Eq, Show)
|
||||
|
||||
@@ -1612,7 +1612,8 @@ subscriber c@AgentClient {msgQ} = forever $ do
|
||||
|
||||
cleanupManager :: forall m. AgentMonad' m => AgentClient -> m ()
|
||||
cleanupManager c@AgentClient {subQ} = do
|
||||
threadDelay =<< asks (initialCleanupDelay . config)
|
||||
delay <- asks (initialCleanupDelay . config)
|
||||
liftIO $ threadDelay' delay
|
||||
int <- asks (cleanupInterval . config)
|
||||
forever $ do
|
||||
void . runExceptT $ do
|
||||
@@ -1620,7 +1621,7 @@ cleanupManager c@AgentClient {subQ} = do
|
||||
deleteRcvFilesExpired `catchError` (notify "" . RFERR)
|
||||
deleteRcvFilesDeleted `catchError` (notify "" . RFERR)
|
||||
deleteRcvFilesTmpPaths `catchError` (notify "" . RFERR)
|
||||
threadDelay int
|
||||
liftIO $ threadDelay' int
|
||||
where
|
||||
deleteConns =
|
||||
withLock (deleteLock c) "cleanupManager" $ do
|
||||
|
||||
@@ -30,6 +30,7 @@ import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
import Control.Monad.Reader
|
||||
import Crypto.Random
|
||||
import Data.Int (Int64)
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import Data.Map (Map)
|
||||
import Data.Time.Clock (NominalDiffTime, nominalDay)
|
||||
@@ -79,8 +80,8 @@ data AgentConfig = AgentConfig
|
||||
messageRetryInterval :: RetryInterval2,
|
||||
messageTimeout :: NominalDiffTime,
|
||||
helloTimeout :: NominalDiffTime,
|
||||
initialCleanupDelay :: Int,
|
||||
cleanupInterval :: Int,
|
||||
initialCleanupDelay :: Int64,
|
||||
cleanupInterval :: Int64,
|
||||
rcvFilesTTL :: NominalDiffTime,
|
||||
xftpNotifyErrsOnRetry :: Bool,
|
||||
deleteErrorCount :: Int,
|
||||
|
||||
@@ -23,10 +23,9 @@ import Control.Monad
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.Reader
|
||||
import Data.Bifunctor (first)
|
||||
import Data.Fixed (Fixed (MkFixed), Pico)
|
||||
import qualified Data.Map.Strict as M
|
||||
import Data.Text (Text)
|
||||
import Data.Time (UTCTime, addUTCTime, diffUTCTime, getCurrentTime, nominalDiffTimeToSeconds)
|
||||
import Data.Time (UTCTime, addUTCTime, getCurrentTime)
|
||||
import Simplex.Messaging.Agent.Client
|
||||
import Simplex.Messaging.Agent.Env.SQLite
|
||||
import Simplex.Messaging.Agent.Protocol (ACommand (..), APartyCmd (..), AgentErrorType (..), BrokerErrorType (..), ConnId, NotificationsMode (..), SAEntity (..))
|
||||
@@ -40,7 +39,7 @@ import Simplex.Messaging.Notifications.Types
|
||||
import Simplex.Messaging.Protocol (NtfServer, ProtocolServer, SMPServer, sameSrvAddr)
|
||||
import Simplex.Messaging.TMap (TMap)
|
||||
import qualified Simplex.Messaging.TMap as TM
|
||||
import Simplex.Messaging.Util (tshow, unlessM)
|
||||
import Simplex.Messaging.Util (diffInMicros, threadDelay', tshow, unlessM)
|
||||
import System.Random (randomR)
|
||||
import UnliftIO
|
||||
import UnliftIO.Concurrent (forkIO, threadDelay)
|
||||
@@ -292,16 +291,10 @@ rescheduleAction doWork ts actionTs
|
||||
| otherwise = do
|
||||
void . atomically $ tryTakeTMVar doWork
|
||||
void . forkIO $ do
|
||||
threadDelay $ diffInMicros actionTs ts
|
||||
liftIO $ threadDelay' $ diffInMicros actionTs ts
|
||||
void . atomically $ tryPutTMVar doWork ()
|
||||
pure True
|
||||
|
||||
fromPico :: Pico -> Integer
|
||||
fromPico (MkFixed i) = i
|
||||
|
||||
diffInMicros :: UTCTime -> UTCTime -> Int
|
||||
diffInMicros a b = (`div` 1000000) . fromInteger . fromPico . nominalDiffTimeToSeconds $ diffUTCTime a b
|
||||
|
||||
retryOnError :: AgentMonad' m => AgentClient -> Text -> m () -> (AgentErrorType -> m ()) -> AgentErrorType -> m ()
|
||||
retryOnError c name loop done e = do
|
||||
logError $ name <> " error: " <> tshow e
|
||||
|
||||
@@ -14,16 +14,17 @@ module Simplex.Messaging.Agent.RetryInterval
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent (forkIO, threadDelay)
|
||||
import Control.Concurrent (forkIO)
|
||||
import Control.Monad (void)
|
||||
import Control.Monad.IO.Class (MonadIO, liftIO)
|
||||
import Simplex.Messaging.Util (whenM)
|
||||
import Data.Int (Int64)
|
||||
import Simplex.Messaging.Util (threadDelay', whenM)
|
||||
import UnliftIO.STM
|
||||
|
||||
data RetryInterval = RetryInterval
|
||||
{ initialInterval :: Int,
|
||||
increaseAfter :: Int,
|
||||
maxInterval :: Int
|
||||
{ initialInterval :: Int64,
|
||||
increaseAfter :: Int64,
|
||||
maxInterval :: Int64
|
||||
}
|
||||
|
||||
data RetryInterval2 = RetryInterval2
|
||||
@@ -32,8 +33,8 @@ data RetryInterval2 = RetryInterval2
|
||||
}
|
||||
|
||||
data RI2State = RI2State
|
||||
{ slowInterval :: Int,
|
||||
fastInterval :: Int
|
||||
{ slowInterval :: Int64,
|
||||
fastInterval :: Int64
|
||||
}
|
||||
deriving (Show)
|
||||
|
||||
@@ -47,14 +48,14 @@ updateRetryInterval2 RI2State {slowInterval, fastInterval} RetryInterval2 {riSlo
|
||||
data RetryIntervalMode = RISlow | RIFast
|
||||
deriving (Eq, Show)
|
||||
|
||||
withRetryInterval :: forall m. MonadIO m => RetryInterval -> (Int -> m () -> m ()) -> m ()
|
||||
withRetryInterval :: forall m. MonadIO m => RetryInterval -> (Int64 -> m () -> m ()) -> m ()
|
||||
withRetryInterval ri action = callAction 0 $ initialInterval ri
|
||||
where
|
||||
callAction :: Int -> Int -> m ()
|
||||
callAction :: Int64 -> Int64 -> m ()
|
||||
callAction elapsed delay = action delay loop
|
||||
where
|
||||
loop = do
|
||||
liftIO $ threadDelay delay
|
||||
liftIO $ threadDelay' delay
|
||||
let elapsed' = elapsed + delay
|
||||
callAction elapsed' $ nextDelay elapsed' delay ri
|
||||
|
||||
@@ -63,7 +64,7 @@ withRetryLock2 :: forall m. MonadIO m => RetryInterval2 -> TMVar () -> (RI2State
|
||||
withRetryLock2 RetryInterval2 {riSlow, riFast} lock action =
|
||||
callAction (0, initialInterval riSlow) (0, initialInterval riFast)
|
||||
where
|
||||
callAction :: (Int, Int) -> (Int, Int) -> m ()
|
||||
callAction :: (Int64, Int64) -> (Int64, Int64) -> m ()
|
||||
callAction slow fast = action (RI2State (snd slow) (snd fast)) loop
|
||||
where
|
||||
loop = \case
|
||||
@@ -77,13 +78,13 @@ withRetryLock2 RetryInterval2 {riSlow, riFast} lock action =
|
||||
wait delay = do
|
||||
waiting <- newTVarIO True
|
||||
_ <- liftIO . forkIO $ do
|
||||
threadDelay delay
|
||||
threadDelay' delay
|
||||
atomically $ whenM (readTVar waiting) $ void $ tryPutTMVar lock ()
|
||||
atomically $ do
|
||||
takeTMVar lock
|
||||
writeTVar waiting False
|
||||
|
||||
nextDelay :: Int -> Int -> RetryInterval -> Int
|
||||
nextDelay :: Int64 -> Int64 -> RetryInterval -> Int64
|
||||
nextDelay elapsed delay RetryInterval {increaseAfter, maxInterval} =
|
||||
if elapsed < increaseAfter || delay == maxInterval
|
||||
then delay
|
||||
|
||||
@@ -845,7 +845,7 @@ getPendingMsgData db connId msgId = do
|
||||
WHERE m.conn_id = ? AND m.internal_id = ?
|
||||
|]
|
||||
(connId, msgId)
|
||||
pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs, Maybe Int, Maybe Int) -> PendingMsgData
|
||||
pendingMsgData :: (AgentMessageType, Maybe MsgFlags, MsgBody, InternalTs, Maybe Int64, Maybe Int64) -> PendingMsgData
|
||||
pendingMsgData (msgType, msgFlags_, msgBody, internalTs, riSlow_, riFast_) =
|
||||
let msgFlags = fromMaybe SMP.noMsgFlags msgFlags_
|
||||
msgRetryState = RI2State <$> riSlow_ <*> riFast_
|
||||
@@ -1921,12 +1921,12 @@ getRcvFile db rcvFileId = runExceptT $ do
|
||||
|]
|
||||
(Only chunkId)
|
||||
where
|
||||
toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica
|
||||
toReplica :: (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int64, Int, NonEmpty TransportHost, ServiceName, C.KeyHash) -> RcvFileChunkReplica
|
||||
toReplica (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries, host, port, keyHash) =
|
||||
let server = XFTPServer host port keyHash
|
||||
in RcvFileChunkReplica {rcvChunkReplicaId, server, replicaId, replicaKey, received, delay, retries}
|
||||
|
||||
updateRcvChunkReplicaDelay :: DB.Connection -> Int64 -> Int -> IO ()
|
||||
updateRcvChunkReplicaDelay :: DB.Connection -> Int64 -> Int64 -> IO ()
|
||||
updateRcvChunkReplicaDelay db replicaId delay = do
|
||||
updatedAt <- getCurrentTime
|
||||
DB.execute db "UPDATE rcv_file_chunk_replicas SET delay = ?, retries = retries + 1, updated_at = ? WHERE rcv_file_chunk_replica_id = ?" (delay, updatedAt, replicaId)
|
||||
@@ -1989,7 +1989,7 @@ getNextRcvChunkToDownload db server@ProtocolServer {host, port, keyHash} ttl = d
|
||||
|]
|
||||
(host, port, keyHash, RFSReceiving, cutoffTs)
|
||||
where
|
||||
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int, Int)) -> RcvFileChunk
|
||||
toChunk :: ((DBRcvFileId, RcvFileId, UserId, Int64, Int, FileSize Word32, FileDigest, FilePath, Maybe FilePath) :. (Int64, ChunkReplicaId, C.APrivateSignKey, Bool, Maybe Int64, Int)) -> RcvFileChunk
|
||||
toChunk ((rcvFileId, rcvFileEntityId, userId, rcvChunkId, chunkNo, chunkSize, digest, fileTmpPath, chunkTmpPath) :. (rcvChunkReplicaId, replicaId, replicaKey, received, delay, retries)) =
|
||||
RcvFileChunk
|
||||
{ rcvFileId,
|
||||
|
||||
@@ -70,7 +70,6 @@ module Simplex.Messaging.Client
|
||||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.STM
|
||||
import Control.Exception
|
||||
@@ -102,7 +101,7 @@ import Simplex.Messaging.Transport
|
||||
import Simplex.Messaging.Transport.Client (SocksProxy, TransportClientConfig (..), TransportHost (..), runTransportClient)
|
||||
import Simplex.Messaging.Transport.KeepAlive
|
||||
import Simplex.Messaging.Transport.WebSockets (WS)
|
||||
import Simplex.Messaging.Util (bshow, raceAny_)
|
||||
import Simplex.Messaging.Util (bshow, raceAny_, threadDelay')
|
||||
import Simplex.Messaging.Version
|
||||
import System.Timeout (timeout)
|
||||
|
||||
@@ -171,7 +170,7 @@ data NetworkConfig = NetworkConfig
|
||||
-- | TCP keep-alive options, Nothing to skip enabling keep-alive
|
||||
tcpKeepAlive :: Maybe KeepAliveOpts,
|
||||
-- | period for SMP ping commands (microseconds, 0 to disable)
|
||||
smpPingInterval :: Int,
|
||||
smpPingInterval :: Int64,
|
||||
-- | the count of PING errors after which SMP client terminates (and will be reconnected), 0 to disable
|
||||
smpPingCount :: Int,
|
||||
logTLSErrors :: Bool
|
||||
@@ -348,7 +347,7 @@ getProtocolClient transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize,
|
||||
|
||||
ping :: ProtocolClient err msg -> IO ()
|
||||
ping c@ProtocolClient {client_ = PClient {pingErrorCount}} = do
|
||||
threadDelay smpPingInterval
|
||||
threadDelay' smpPingInterval
|
||||
runExceptT (sendProtocolCommand c Nothing "" $ protocolPing @err @msg) >>= \case
|
||||
Left PCEResponseTimeout -> do
|
||||
cnt <- atomically $ stateTVar pingErrorCount $ \cnt -> (cnt + 1, cnt + 1)
|
||||
|
||||
@@ -19,6 +19,7 @@ import Control.Monad.Reader
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (intercalate)
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Text as T
|
||||
@@ -97,11 +98,11 @@ ntfServer cfg@NtfServerConfig {transports, logTLSErrors} started = do
|
||||
[logServerStats logStatsStartTime interval serverStatsLogFile]
|
||||
serverStatsThread_ _ = []
|
||||
|
||||
logServerStats :: Int -> Int -> FilePath -> M ()
|
||||
logServerStats :: Int64 -> Int64 -> FilePath -> M ()
|
||||
logServerStats startAt logInterval statsFilePath = do
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
|
||||
threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
NtfServerStats {fromTime, tknCreated, tknVerified, tknDeleted, subCreated, subDeleted, ntfReceived, ntfDelivered, activeTokens, activeSubs} <- asks serverStats
|
||||
let interval = 1000000 * logInterval
|
||||
withFile statsFilePath AppendMode $ \h -> liftIO $ do
|
||||
@@ -136,7 +137,7 @@ ntfServer cfg@NtfServerConfig {transports, logTLSErrors} started = do
|
||||
weekCount sub,
|
||||
monthCount sub
|
||||
]
|
||||
threadDelay interval
|
||||
threadDelay' interval
|
||||
|
||||
resubscribe :: NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> M ()
|
||||
resubscribe NtfSubscriber {newSubQ} subs = do
|
||||
@@ -494,7 +495,7 @@ client NtfServerClient {rcvQ, sndQ} NtfSubscriber {newSubQ, smpAgent = ca} NtfPu
|
||||
atomically $ TM.insert tknId notifier intervalNotifiers
|
||||
where
|
||||
intervalNotifier delay = forever $ do
|
||||
threadDelay delay
|
||||
liftIO $ threadDelay' delay
|
||||
atomically $ writeTBQueue pushQ (tkn, PNCheckMessages)
|
||||
NtfReqNew corrId (ANE SSubscription newSub) -> do
|
||||
logDebug "SNEW - new subscription"
|
||||
|
||||
@@ -11,6 +11,7 @@ import Control.Concurrent.Async (Async)
|
||||
import Control.Monad.IO.Unlift
|
||||
import Crypto.Random
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Int (Int64)
|
||||
import Data.Time.Clock (getCurrentTime)
|
||||
import Data.Time.Clock.System (SystemTime)
|
||||
import Data.Word (Word16)
|
||||
@@ -52,8 +53,8 @@ data NtfServerConfig = NtfServerConfig
|
||||
privateKeyFile :: FilePath,
|
||||
certificateFile :: FilePath,
|
||||
-- stats config - see SMP server config
|
||||
logStatsInterval :: Maybe Int,
|
||||
logStatsStartTime :: Int,
|
||||
logStatsInterval :: Maybe Int64,
|
||||
logStatsStartTime :: Int64,
|
||||
serverStatsLogFile :: FilePath,
|
||||
serverStatsBackupFile :: Maybe FilePath,
|
||||
logTLSErrors :: Bool
|
||||
|
||||
@@ -46,6 +46,7 @@ import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Either (fromRight, partitionEithers)
|
||||
import Data.Functor (($>))
|
||||
import Data.Int (Int64)
|
||||
import Data.List (intercalate)
|
||||
import qualified Data.List.NonEmpty as L
|
||||
import qualified Data.Map.Strict as M
|
||||
@@ -157,7 +158,7 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do
|
||||
quota <- asks $ msgQueueQuota . config
|
||||
let interval = checkInterval expCfg * 1000000
|
||||
forever $ do
|
||||
threadDelay interval
|
||||
liftIO $ threadDelay' interval
|
||||
old <- liftIO $ expireBeforeEpoch expCfg
|
||||
rIds <- M.keysSet <$> readTVarIO ms
|
||||
forM_ rIds $ \rId ->
|
||||
@@ -169,11 +170,11 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do
|
||||
[logServerStats logStatsStartTime interval serverStatsLogFile]
|
||||
serverStatsThread_ _ = []
|
||||
|
||||
logServerStats :: Int -> Int -> FilePath -> M ()
|
||||
logServerStats :: Int64 -> Int64 -> FilePath -> M ()
|
||||
logServerStats startAt logInterval statsFilePath = do
|
||||
initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime
|
||||
liftIO $ putStrLn $ "server stats log enabled: " <> statsFilePath
|
||||
threadDelay $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0)
|
||||
ServerStats {fromTime, qCreated, qSecured, qDeleted, msgSent, msgRecv, activeQueues, msgSentNtf, msgRecvNtf, activeQueuesNtf, qCount, msgCount} <- asks serverStats
|
||||
let interval = 1000000 * logInterval
|
||||
withFile statsFilePath AppendMode $ \h -> liftIO $ do
|
||||
@@ -212,7 +213,7 @@ smpServer started cfg@ServerConfig {transports, logTLSErrors} = do
|
||||
show qCount',
|
||||
show msgCount'
|
||||
]
|
||||
threadDelay interval
|
||||
threadDelay' interval
|
||||
|
||||
runClient :: Transport c => TProxy c -> c -> M ()
|
||||
runClient _ h = do
|
||||
@@ -292,7 +293,7 @@ disconnectTransport :: Transport c => THandle c -> client -> (client -> TVar Sys
|
||||
disconnectTransport THandle {connection} c activeAt expCfg = do
|
||||
let interval = checkInterval expCfg * 1000000
|
||||
forever . liftIO $ do
|
||||
threadDelay interval
|
||||
threadDelay' interval
|
||||
old <- expireBeforeEpoch expCfg
|
||||
ts <- readTVarIO $ activeAt c
|
||||
when (systemSeconds ts < old) $ closeConnection connection
|
||||
|
||||
@@ -9,6 +9,7 @@ import Control.Concurrent (ThreadId)
|
||||
import Control.Monad.IO.Unlift
|
||||
import Crypto.Random
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import Data.Int (Int64)
|
||||
import Data.List.NonEmpty (NonEmpty)
|
||||
import Data.Map.Strict (Map)
|
||||
import qualified Data.Map.Strict as M
|
||||
@@ -54,10 +55,10 @@ data ServerConfig = ServerConfig
|
||||
-- and check interval, seconds
|
||||
inactiveClientExpiration :: Maybe ExpirationConfig,
|
||||
-- | log SMP server usage statistics, only aggregates are logged, seconds
|
||||
logStatsInterval :: Maybe Int,
|
||||
logStatsInterval :: Maybe Int64,
|
||||
-- | time of the day when the stats are logged first, to log at consistent times,
|
||||
-- irrespective of when the server is started (seconds from 00:00 UTC)
|
||||
logStatsStartTime :: Int,
|
||||
logStatsStartTime :: Int64,
|
||||
-- | file to log stats
|
||||
serverStatsLogFile :: FilePath,
|
||||
-- | file to save and restore stats
|
||||
|
||||
@@ -10,7 +10,7 @@ data ExpirationConfig = ExpirationConfig
|
||||
{ -- time after which the entity can be expired, seconds
|
||||
ttl :: Int64,
|
||||
-- interval to check expiration, seconds
|
||||
checkInterval :: Int
|
||||
checkInterval :: Int64
|
||||
}
|
||||
|
||||
expireBeforeEpoch :: ExpirationConfig -> IO Int64
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
{-# LANGUAGE NumericUnderscores #-}
|
||||
{-# LANGUAGE OverloadedStrings #-}
|
||||
{-# LANGUAGE ScopedTypeVariables #-}
|
||||
|
||||
module Simplex.Messaging.Util where
|
||||
|
||||
import Control.Concurrent (threadDelay)
|
||||
import qualified Control.Exception as E
|
||||
import Control.Monad.Except
|
||||
import Control.Monad.IO.Unlift
|
||||
@@ -10,9 +12,13 @@ import Control.Monad.Trans.Except
|
||||
import Data.Bifunctor (first)
|
||||
import Data.ByteString.Char8 (ByteString)
|
||||
import qualified Data.ByteString.Char8 as B
|
||||
import Data.Fixed (Fixed (MkFixed), Pico)
|
||||
import Data.Int (Int64)
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
import Data.Text.Encoding (decodeUtf8With)
|
||||
import Data.Time (nominalDiffTimeToSeconds)
|
||||
import Data.Time.Clock (UTCTime, diffUTCTime)
|
||||
import UnliftIO.Async
|
||||
|
||||
raceAny_ :: MonadUnliftIO m => [m a] -> m ()
|
||||
@@ -102,3 +108,23 @@ safeDecodeUtf8 :: ByteString -> Text
|
||||
safeDecodeUtf8 = decodeUtf8With onError
|
||||
where
|
||||
onError _ _ = Just '?'
|
||||
|
||||
threadDelay' :: Int64 -> IO ()
|
||||
threadDelay' time
|
||||
| time <= 0 = pure ()
|
||||
threadDelay' time = do
|
||||
let maxWait = min time $ fromIntegral (maxBound :: Int)
|
||||
threadDelay $ fromIntegral maxWait
|
||||
when (maxWait /= time) $ threadDelay' (time - maxWait)
|
||||
|
||||
diffInSeconds :: UTCTime -> UTCTime -> Int64
|
||||
diffInSeconds a b = (`div` 1000000_000000) $ diffInPicos a b
|
||||
|
||||
diffInMicros :: UTCTime -> UTCTime -> Int64
|
||||
diffInMicros a b = (`div` 1000000) $ diffInPicos a b
|
||||
|
||||
diffInPicos :: UTCTime -> UTCTime -> Int64
|
||||
diffInPicos a b = fromInteger . fromPico . nominalDiffTimeToSeconds $ diffUTCTime a b
|
||||
|
||||
fromPico :: Pico -> Integer
|
||||
fromPico (MkFixed i) = i
|
||||
|
||||
Reference in New Issue
Block a user