ntf server: better batching and logging (#780)

* ntf server: better batching and logging

* reduce batch delay for ntf server

* comments

* 5.1.3, ntf 1.4.2

* more logging

* more logging

* split large batches, more logging

* remove some logs
This commit is contained in:
Evgeny Poberezkin
2023-06-26 20:14:35 +01:00
committed by GitHub
parent 3a74558e84
commit 4a927d1ae2
19 changed files with 149 additions and 123 deletions
+61 -45
View File
@@ -16,16 +16,17 @@ import Control.Concurrent.STM (stateTVar)
import Control.Logger.Simple
import Control.Monad.Except
import Control.Monad.Reader
import Data.Bifunctor (second)
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Function (on)
import Data.Functor (($>))
import Data.Int (Int64)
import Data.List (intercalate)
import Data.List (intercalate, sort)
import Data.List.NonEmpty (NonEmpty(..))
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as M
import Data.Maybe (catMaybes)
import qualified Data.Text as T
import Data.Text.Encoding (decodeLatin1)
import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime)
@@ -55,11 +56,10 @@ import System.Exit (exitFailure)
import System.IO (BufferMode (..), hPutStrLn, hSetBuffering)
import System.Mem.Weak (deRefWeak)
import UnliftIO (IOMode (..), async, uninterruptibleCancel, withFile)
import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId, threadDelay)
import UnliftIO.Concurrent (forkIO, killThread, mkWeakThreadId)
import UnliftIO.Directory (doesFileExist, renameFile)
import UnliftIO.Exception
import UnliftIO.STM
import Data.Bifunctor (second)
runNtfServer :: NtfServerConfig -> IO ()
runNtfServer cfg = do
@@ -147,23 +147,28 @@ ntfServer cfg@NtfServerConfig {transports, logTLSErrors} started = do
resubscribe :: NtfSubscriber -> Map NtfSubscriptionId NtfSubData -> M ()
resubscribe NtfSubscriber {newSubQ} subs = do
subs' <- atomically $ filterM (fmap ntfShouldSubscribe . readTVar . subStatus) $ M.elems subs
mapM_ (atomically . writeTBQueue newSubQ . L.map NtfSub) $ L.nonEmpty subs'
liftIO $ logInfo "SMP connections resubscribed"
atomically . writeTBQueue newSubQ $ map NtfSub subs'
liftIO $ logInfo $ "SMP resubscriptions queued (" <> tshow (length subs') <> " subscriptions)"
ntfSubscriber :: NtfSubscriber -> M ()
ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = do
raceAny_ [subscribe, receiveSMP, receiveAgent]
where
subscribe :: M ()
subscribe = do
d <- asks $ resubscribeDelay . config
forever $ do
subs <- atomically (readTBQueue newSubQ)
let ss = L.groupBy ((==) `on` server) subs
forM_ ss $ \serverSubs -> do
SMPSubscriber {newSubQ = subscriberSubQ} <- getSMPSubscriber $ server $ L.head serverSubs
atomically $ writeTQueue subscriberSubQ serverSubs
when (length serverSubs > 10) $ threadDelay d
subscribe = forever $ do
subs <- atomically (readTBQueue newSubQ)
let ss = L.groupAllWith server subs
forM_ ss $ \serverSubs -> do
let srv = server $ L.head serverSubs
batches = toChunks 900 $ L.toList serverSubs
SMPSubscriber {newSubQ = subscriberSubQ} <- getSMPSubscriber srv
mapM_ (atomically . writeTQueue subscriberSubQ) batches
toChunks :: Int -> [a] -> [NonEmpty a]
toChunks _ [] = []
toChunks n xs =
let (ys, xs') = splitAt n xs
in maybe id (:) (L.nonEmpty ys) (toChunks n xs')
server :: NtfEntityRec 'Subscription -> SMPServer
server (NtfSub sub) = ntfSubServer sub
@@ -184,21 +189,26 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
forever $ do
subs <- atomically (peekTQueue subscriberSubQ)
let subs' = L.map (\(NtfSub sub) -> sub) subs
srv = server $ L.head subs
logSubStatus srv "subscribing" $ length subs
mapM_ (\NtfSubData {smpQueue} -> updateSubStatus smpQueue NSPending) subs'
rs <- liftIO $ subscribeQueues (server $ L.head subs) subs'
subs_ <- L.nonEmpty <$> foldM process [] rs
rs <- liftIO $ subscribeQueues srv subs'
(subs'', oks, errs) <- foldM process ([], 0, []) rs
atomically $ do
void $ readTQueue subscriberSubQ
mapM_ (writeTQueue subscriberSubQ . L.map NtfSub) subs_
mapM_ (writeTQueue subscriberSubQ . L.map NtfSub) $ L.nonEmpty subs''
logSubStatus srv "retrying" $ length subs''
logSubStatus srv "subscribed" oks
logSubErrors srv errs
where
process subs (sub@NtfSubData {smpQueue}, r) = case r of
Right _ -> updateSubStatus smpQueue NSActive $> subs
Left err -> do
handleSubError smpQueue err
pure $ case err of
PCEResponseTimeout -> sub : subs
PCENetworkError -> sub : subs
_ -> subs
process :: ([NtfSubData], Int, [NtfSubStatus]) -> (NtfSubData, Either SMPClientError ()) -> M ([NtfSubData], Int, [NtfSubStatus])
process (subs, oks, errs) (sub@NtfSubData {smpQueue}, r) = case r of
Right _ -> updateSubStatus smpQueue NSActive $> (subs, oks + 1, errs)
Left e -> update <$> handleSubError smpQueue e
where
update = \case
Just err -> (subs, oks, err : errs) -- permanent error, log and don't retry subscription
Nothing -> (sub : subs, oks, errs) -- temporary error, retry subscription
-- | Subscribe to queues. The list of results can have a different order.
subscribeQueues :: SMPServer -> NonEmpty NtfSubData -> IO (NonEmpty (NtfSubData, Either SMPClientError ()))
@@ -230,37 +240,43 @@ ntfSubscriber NtfSubscriber {smpSubscribers, newSubQ, smpAgent = ca@SMPClientAge
atomically (readTBQueue agentQ) >>= \case
CAConnected _ -> pure ()
CADisconnected srv subs -> do
logInfo $ "SMP server disconnected " <> showServer' srv <> " (" <> tshow (length subs) <> ") subscriptions"
logSubStatus srv "disconnected" $ length subs
forM_ subs $ \(_, ntfId) -> do
let smpQueue = SMPQueueNtf srv ntfId
updateSubStatus smpQueue NSInactive
CAReconnected srv ->
logInfo $ "SMP server reconnected " <> showServer' srv
CAResubscribed srv sub -> do
let ntfId = snd sub
smpQueue = SMPQueueNtf srv ntfId
updateSubStatus smpQueue NSActive
CASubError srv (_, ntfId) err -> do
logError $ "SMP subscription error on server " <> showServer' srv <> ": " <> tshow err
handleSubError (SMPQueueNtf srv ntfId) err
where
showServer' = decodeLatin1 . strEncode . host
CAResubscribed srv subs -> do
forM_ subs $ \(_, ntfId) -> updateSubStatus (SMPQueueNtf srv ntfId) NSActive
logSubStatus srv "resubscribed" $ length subs
CASubError srv errs ->
forM errs (\((_, ntfId), err) -> handleSubError (SMPQueueNtf srv ntfId) err)
>>= logSubErrors srv . catMaybes . L.toList
handleSubError :: SMPQueueNtf -> SMPClientError -> M ()
logSubStatus srv event n = when (n > 0) $
logInfo $ "SMP server " <> event <> " " <> showServer' srv <> " (" <> tshow n <> " subscriptions)"
logSubErrors :: SMPServer -> [NtfSubStatus] -> M ()
logSubErrors srv errs = forM_ (L.group $ sort errs) $ \errs' -> do
logError $ "SMP subscription errors on server " <> showServer' srv <> ": " <> tshow (L.head errs') <> " (" <> tshow (length errs') <> " errors)"
showServer' = decodeLatin1 . strEncode . host
handleSubError :: SMPQueueNtf -> SMPClientError -> M (Maybe NtfSubStatus)
handleSubError smpQueue = \case
PCEProtocolError AUTH -> updateSubStatus smpQueue NSAuth
PCEProtocolError AUTH -> updateSubStatus smpQueue NSAuth $> Just NSAuth
PCEProtocolError e -> updateErr "SMP error " e
PCEIOError e -> updateErr "IOError " e
PCEResponseError e -> updateErr "ResponseError " e
PCEUnexpectedResponse r -> updateErr "UnexpectedResponse " r
PCETransportError e -> updateErr "TransportError " e
PCECryptoError e -> updateErr "CryptoError " e
PCEIncompatibleHost -> updateSubStatus smpQueue $ NSErr "IncompatibleHost"
PCEResponseTimeout -> pure ()
PCENetworkError -> pure ()
PCEIncompatibleHost -> let e = NSErr "IncompatibleHost" in updateSubStatus smpQueue e $> Just e
PCEResponseTimeout -> pure Nothing
PCENetworkError -> pure Nothing
PCEIOError _ -> pure Nothing
where
updateErr :: Show e => ByteString -> e -> M ()
updateErr errType e = updateSubStatus smpQueue . NSErr $ errType <> bshow e
updateErr :: Show e => ByteString -> e -> M (Maybe NtfSubStatus)
updateErr errType e = updateSubStatus smpQueue (NSErr $ errType <> bshow e) $> Just (NSErr errType)
updateSubStatus smpQueue status = do
st <- asks store
@@ -354,7 +370,7 @@ receive th NtfServerClient {rcvQ, sndQ, activeAt} = forever $ do
send :: Transport c => THandle c -> NtfServerClient -> IO ()
send h@THandle {thVersion = v} NtfServerClient {sndQ, sessionId, activeAt} = forever $ do
t <- atomically $ readTBQueue sndQ
void . liftIO $ tPut h [(Nothing, encodeTransmission v sessionId t)]
void . liftIO $ tPut h Nothing [(Nothing, encodeTransmission v sessionId t)]
atomically . writeTVar activeAt =<< liftIO getSystemTime
-- instance Show a => Show (TVar a) where