我对Haskell还比较陌生,我正在处理一个现有的代码库,它从文件共享中收集文件。为了并行处理文件共享,使用了Conduit
。这个支架是基于this tutorial的。为了连续读取文件共享,我添加了延迟和对streamFile
函数的递归调用。我不确定这是否是问题所在,但内存分配不断增加,最高可达几千兆字节。
导致内存泄漏的问题可能是什么?
module FileScraper(runFileScraperFinal, FileScraper, watch, watchDirectories) where
import Actions (PostProcAction)
import Colog (LogAction, Msg, Severity)
import Conduit (ConduitM, ConduitT, MonadIO (..), MonadResource, MonadTrans (lift), MonadUnliftIO (withRunInIO), ResourceT, await, bracketP, mapMC, mapM_C, runConduit, runResourceT, yield, (.|), takeWhileC)
import Control.Concurrent (threadDelay)
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.STM as STM
import qualified Control.Concurrent.STM.TBMQueue as STM
import Data.ByteString (ByteString, readFile)
import Data.Conduit.Combinators (filterM, yieldMany)
import Data.Functor ((<&>))
import Data.Text (Text, unpack)
import Filters (FileFilter, DirectoryFilter)
import Polysemy (Final, Inspector (inspect), Member, Sem, makeSem)
import Polysemy.Final (bindS, getInitialStateS, getInspectorS, interpretFinal, liftS)
import Prelude hiding (filter, init, readFile)
import System.FilePath.Find (find, RecursionPredicate, (/~?), filePath, (&&?), (==?), fileType, FileType (RegularFile), always)
import System.Posix (raiseSignal, sigTERM)
data FileScraper m a where
Watch :: [(Text, Text, FileFilter, DirectoryFilter, PostProcAction)] -> (FilePath -> ByteString -> Text -> PostProcAction -> m Bool) -> FileScraper m ()
makeSem ''FileScraper
runFileScraperFinal :: forall m. (MonadUnliftIO m => forall r a. (Member (Final m) r) => LogAction m (Msg Severity) -> Sem (FileScraper ': r) a -> Sem r a)
runFileScraperFinal _ = do
interpretFinal @m (\case
Watch sources callback -> do
is <- getInitialStateS
ins <- getInspectorS
cb' <- bindS $ uncurry4 callback
liftS $ withRunInIO $ \runInIO -> liftIO $ do
runResourceT . runConduit $ watchDirectories sources .| mapMC (\(fp,fc,dest,ppa) -> lift $ do
eff <- runInIO $ cb' ((fp,fc,dest,ppa) <$ is)
case inspect ins eff of
Nothing -> do
raiseSignal sigTERM
pure False
Just v -> do
pure v
) .| takeWhileC id .| mapM_C (const $ pure ())
)
uncurry4 :: (a -> b -> c -> d -> e) -> ((a, b, c, d) -> e)
uncurry4 f ~(a,b,c,d) = f a b c d
watchDirectories :: MonadResource m => [(Text, Text, FileFilter, DirectoryFilter, PostProcAction)] -> ConduitM a (FilePath, ByteString, Text, PostProcAction) m ()
watchDirectories sourceToFilterMap = parSources (fmap (\(src, dest, filter, dirFilter, postProcActions) -> streamFile (unpack src) dest filter dirFilter postProcActions) sourceToFilterMap)
streamFile :: MonadResource m => FilePath -> Text -> FileFilter -> DirectoryFilter -> PostProcAction -> ConduitM a (FilePath, ByteString, Text, PostProcAction) m ()
streamFile baseDir destination filter dirFilter postProcActions = do
newFiles <- liftIO $ find (recursionPredicate dirFilter) (fileType ==? RegularFile) baseDir
yieldMany newFiles .| filterM (liftIO . filter) .| mapMC (\entry -> do
liftIO $ readFile entry <&> (entry,,destination,postProcActions))
let minutes :: Int = 60_000_000
liftIO $ threadDelay (5 * minutes)
streamFile baseDir destination filter dirFilter postProcActions
where
recursionPredicate :: DirectoryFilter -> RecursionPredicate
recursionPredicate df = case df of
[] -> always
excludes -> foldl1 (&&?) $ map ((/~?) filePath . unpack) excludes
parSources :: (MonadResource m, Foldable f) => f (ConduitM () o (ResourceT IO) ()) -> ConduitT i o m ()
parSources sources = bracketP init cleanup finalSource
where
init = do
-- create the queue where all sources will put their items
queue <- STM.newTBMQueueIO 100
-- In a separate thread, run concurrently all conduits
a <- Async.async $ do
Async.mapConcurrently_ (\source -> runResourceT $ runConduit (source .| sinkQueue queue)) sources
-- once all conduits are done, close the queue
STM.atomically (STM.closeTBMQueue queue)
pure (a, queue)
cleanup (async, queue) = do
-- upon exception or cancellation, close the queue and cancel the threads
STM.atomically (STM.closeTBMQueue queue)
Async.cancel async
finalSource (_, queue) = sourceQueue queue
sourceQueue :: MonadIO m => STM.TBMQueue o -> ConduitT i o m ()
sourceQueue queue = do
mbItem <- liftIO $ STM.atomically (STM.readTBMQueue queue)
case mbItem of
Nothing -> pure () -- queue closed
Just item -> yield item *> sourceQueue queue
sinkQueue :: MonadIO m => STM.TBMQueue a -> ConduitT a o m ()
sinkQueue queue = do
mbItem <- await
case mbItem of
Nothing -> pure () -- no more items to come
Just item -> do
liftIO $ STM.atomically (STM.writeTBMQueue queue item)
sinkQueue queue
更新(添加了使用回调的函数):
...
void $ async $ watch normalisedPrefixedSources (\fp content dest ppa -> do
log Info $ "Sending file " <> pack fp
result <- await =<< send (unpack dest) content
case result of
Just True -> do
log Info $ "File sent " <> pack fp
res <- embed @m $ liftIO $ ppa fp
if res then pure True else do
log Error "Raise signal for graceful shutdown."
embed @m $ liftIO $ raiseSignal sigTERM
pure False
_ -> do
log Error $ "Error sending file " <> pack fp <> ". Raise signal for graceful shutdown."
embed @m $ liftIO $ raiseSignal sigTERM
pure False
)
...
更新2:从配置中删除幂等滤波器后(@K. A. Buhr的更改仍然有效),内存消耗保持不变。
type FileFilter = FilePath -> IO Bool
createIdempotentFilter :: LogAction IO Message -> M.Idempotent -> IO FileFilter
createIdempotentFilter la filterConfig = do
cache <- newIORef []
let configuredCacheSize :: Int = fromIntegral $ M.lruCacheSize filterConfig
pure $ \path -> do
fileModificationEpoch <- getModificationTime path
cache' <- readIORef cache
if (path, fileModificationEpoch) `elem` cache' then do
la <& logText Debug ("File already in cache " <> pack path <> " | " <> pack (show fileModificationEpoch))
pure False
else do
la <& logText Debug ("File not in cache " <> pack path <> " | " <> pack (show fileModificationEpoch))
let alreadyScanned' = cache' <> [(path, fileModificationEpoch)]
writeIORef cache $ drop (length alreadyScanned' - configuredCacheSize) alreadyScanned'
pure True
函数createIdempotentFilter
中是否存在任何有问题的代码(导致内存泄漏)?
1条答案
按热度按时间htrmnn0y1#
首先,确保排除
ByteString
s文件内容作为泄漏源的可能性。运行中的文件的最大数量等于有界队列的长度,因此高水位线将是来自输入文件系统的100个文件的任意集合的内容。如果您正在处理包含大型视频/图像文件的文件系统,您可能会看到不稳定的、此外,如果回调函数包含对这些文件(部分或全部)的路径名和/或内容的引用,则会导致非常严重的空间泄漏。通过将readFile entry
替换为return mempty
并使用空回调函数(\_ _ _ _ -> return True)
,可以排除所有这些问题。在我自己做了一个类似的改变后,我能够复制你的空间泄漏,并追踪到两个技术问题。
第一个是:
将其替换为:
将通过测试文件系统的单次传递的最大驻留空间从130 MB减少到15 MB,但在堆配置文件上,堆使用率仍呈典型的线性增加。
第二个是:
将其替换为:
完全消除了泄漏。最大驻留空间只有2 MB,并且在多次通过测试文件系统的堆概要文件上没有可识别的泄漏。
对于这两个问题,我都不太清楚到底发生了什么。
*>
与>>
的问题是我以前遇到过的一个问题。虽然它们在语义上是等效的,但它们不一定有相同的实现,有时*>
会泄漏>>
没有泄漏的空间。然而,takeWhileC
问题对我来说是个谜。