基于Haskell的文件流导致内存泄漏

vhmi4jdf  于 2022-11-14  发布在  其他
关注(0)|答案(1)|浏览(201)

我对Haskell还比较陌生,我正在处理一个现有的代码库,它从文件共享中收集文件。为了并行处理文件共享,使用了Conduit。这个支架是基于this tutorial的。为了连续读取文件共享,我添加了延迟和对streamFile函数的递归调用。我不确定这是否是问题所在,但内存分配不断增加,最高可达几千兆字节。

导致内存泄漏的问题可能是什么?

module FileScraper(runFileScraperFinal, FileScraper, watch, watchDirectories) where

import           Actions                         (PostProcAction)
import           Colog                           (LogAction, Msg, Severity)
import           Conduit                         (ConduitM, ConduitT, MonadIO (..), MonadResource, MonadTrans (lift), MonadUnliftIO (withRunInIO), ResourceT, await, bracketP, mapMC, mapM_C, runConduit, runResourceT, yield, (.|), takeWhileC)
import           Control.Concurrent              (threadDelay)
import qualified Control.Concurrent.Async        as Async
import qualified Control.Concurrent.STM          as STM
import qualified Control.Concurrent.STM.TBMQueue as STM
import           Data.ByteString                 (ByteString, readFile)
import           Data.Conduit.Combinators        (filterM, yieldMany)
import           Data.Functor                    ((<&>))
import           Data.Text                       (Text, unpack)
import           Filters                         (FileFilter, DirectoryFilter)
import           Polysemy                        (Final, Inspector (inspect), Member, Sem, makeSem)
import           Polysemy.Final                  (bindS, getInitialStateS, getInspectorS, interpretFinal, liftS)
import           Prelude                         hiding (filter, init, readFile)
import           System.FilePath.Find            (find, RecursionPredicate, (/~?), filePath, (&&?), (==?), fileType, FileType (RegularFile), always)
import           System.Posix                    (raiseSignal, sigTERM)

data FileScraper m a where
  Watch :: [(Text, Text, FileFilter, DirectoryFilter, PostProcAction)] -> (FilePath -> ByteString -> Text -> PostProcAction -> m Bool) -> FileScraper m ()

makeSem ''FileScraper

runFileScraperFinal :: forall m. (MonadUnliftIO m => forall r a. (Member (Final m) r) => LogAction m (Msg Severity) -> Sem (FileScraper ': r) a -> Sem r a)
runFileScraperFinal _ = do
  interpretFinal @m (\case
    Watch sources callback -> do
      is <- getInitialStateS
      ins <- getInspectorS
      cb' <- bindS $ uncurry4 callback
      liftS $ withRunInIO $ \runInIO -> liftIO $ do
        runResourceT . runConduit $ watchDirectories sources .| mapMC (\(fp,fc,dest,ppa) -> lift $ do
          eff <- runInIO $ cb' ((fp,fc,dest,ppa) <$ is)
          case inspect ins eff of
            Nothing -> do
              raiseSignal sigTERM
              pure False
            Just v -> do
              pure v
          ) .| takeWhileC id .| mapM_C (const $ pure ())
    )

uncurry4 :: (a -> b -> c -> d -> e) -> ((a, b, c, d) -> e)
uncurry4 f ~(a,b,c,d) = f a b c d

watchDirectories :: MonadResource m => [(Text, Text, FileFilter, DirectoryFilter, PostProcAction)] -> ConduitM a (FilePath, ByteString, Text, PostProcAction) m ()
watchDirectories sourceToFilterMap = parSources (fmap (\(src, dest, filter, dirFilter, postProcActions) -> streamFile (unpack src) dest filter dirFilter postProcActions) sourceToFilterMap)

streamFile :: MonadResource m => FilePath -> Text -> FileFilter -> DirectoryFilter -> PostProcAction -> ConduitM a (FilePath, ByteString, Text, PostProcAction) m ()
streamFile baseDir destination filter dirFilter postProcActions = do
    newFiles <- liftIO $ find (recursionPredicate dirFilter) (fileType ==? RegularFile) baseDir
    yieldMany newFiles .| filterM (liftIO . filter) .| mapMC (\entry -> do
      liftIO $ readFile entry <&> (entry,,destination,postProcActions))
    let minutes :: Int = 60_000_000
    liftIO $ threadDelay (5 * minutes)
    streamFile baseDir destination filter dirFilter postProcActions
    where
      recursionPredicate :: DirectoryFilter -> RecursionPredicate
      recursionPredicate df = case df of
        [] -> always
        excludes -> foldl1 (&&?) $ map ((/~?) filePath . unpack) excludes

parSources :: (MonadResource m, Foldable f) => f (ConduitM () o (ResourceT IO) ()) -> ConduitT i o m ()
parSources sources = bracketP init cleanup finalSource
  where
    init = do
        -- create the queue where all sources will put their items
        queue <- STM.newTBMQueueIO 100

        -- In a separate thread, run concurrently all conduits
        a <- Async.async $ do
            Async.mapConcurrently_ (\source -> runResourceT $ runConduit (source .| sinkQueue queue)) sources
            -- once all conduits are done, close the queue
            STM.atomically (STM.closeTBMQueue queue)
        pure (a, queue)
    cleanup (async, queue) = do
        -- upon exception or cancellation, close the queue and cancel the threads
        STM.atomically (STM.closeTBMQueue queue)
        Async.cancel async
    finalSource (_, queue) = sourceQueue queue

sourceQueue :: MonadIO m => STM.TBMQueue o -> ConduitT i o m ()
sourceQueue queue = do
        mbItem <- liftIO $ STM.atomically (STM.readTBMQueue queue)
        case mbItem of
            Nothing -> pure ()  -- queue closed
            Just item -> yield item *> sourceQueue queue

sinkQueue :: MonadIO m => STM.TBMQueue a -> ConduitT a o m ()
sinkQueue queue = do
        mbItem <- await
        case mbItem of
            Nothing -> pure ()  -- no more items to come
            Just item -> do
                liftIO $ STM.atomically (STM.writeTBMQueue queue item)
                sinkQueue queue

更新(添加了使用回调的函数):

...
void $ async $ watch normalisedPrefixedSources (\fp content dest ppa -> do
    log Info $ "Sending file " <> pack fp

    result <- await =<< send (unpack dest) content

    case result of
      Just True -> do
        log Info $ "File sent " <> pack fp
        res <- embed @m $ liftIO $ ppa fp
        if res then pure True else do
          log Error "Raise signal for graceful shutdown."
          embed @m $ liftIO $ raiseSignal sigTERM
          pure False
      _ -> do
        log Error $ "Error sending file " <> pack fp <> ". Raise signal for graceful shutdown."
        embed @m $ liftIO $ raiseSignal sigTERM
        pure False
    )
...

更新2:从配置中删除幂等滤波器后(@K. A. Buhr的更改仍然有效),内存消耗保持不变。

type FileFilter = FilePath -> IO Bool

createIdempotentFilter :: LogAction IO Message -> M.Idempotent -> IO FileFilter
createIdempotentFilter la filterConfig = do
    cache <- newIORef []
    let configuredCacheSize :: Int = fromIntegral $ M.lruCacheSize filterConfig
    pure $ \path -> do
        fileModificationEpoch <- getModificationTime path
        cache' <- readIORef cache
        if (path, fileModificationEpoch) `elem` cache' then do
            la <& logText Debug ("File already in cache " <> pack path <> " | " <> pack (show fileModificationEpoch))
            pure False
        else do
            la <& logText Debug ("File not in cache " <> pack path <> " | " <> pack (show fileModificationEpoch))
            let alreadyScanned' = cache' <> [(path, fileModificationEpoch)]
            writeIORef cache $ drop (length alreadyScanned' - configuredCacheSize) alreadyScanned'
            pure True

函数createIdempotentFilter中是否存在任何有问题的代码(导致内存泄漏)?

htrmnn0y

htrmnn0y1#

首先,确保排除ByteString s文件内容作为泄漏源的可能性。运行中的文件的最大数量等于有界队列的长度,因此高水位线将是来自输入文件系统的100个文件的任意集合的内容。如果您正在处理包含大型视频/图像文件的文件系统,您可能会看到不稳定的、此外,如果回调函数包含对这些文件(部分或全部)的路径名和/或内容的引用,则会导致非常严重的空间泄漏。通过将readFile entry替换为return mempty并使用空回调函数(\_ _ _ _ -> return True),可以排除所有这些问题。
在我自己做了一个类似的改变后,我能够复制你的空间泄漏,并追踪到两个技术问题。
第一个是:

.| takeWhileC id .| mapM_C (const $ pure ())

将其替换为:

.| Control.Monad.void andC

将通过测试文件系统的单次传递的最大驻留空间从130 MB减少到15 MB,但在堆配置文件上,堆使用率仍呈典型的线性增加。
第二个是:

yield item *> sourceQueue queue

将其替换为:

yield item >> sourceQueue queue

完全消除了泄漏。最大驻留空间只有2 MB,并且在多次通过测试文件系统的堆概要文件上没有可识别的泄漏。
对于这两个问题,我都不太清楚到底发生了什么。*>>>的问题是我以前遇到过的一个问题。虽然它们在语义上是等效的,但它们不一定有相同的实现,有时*>会泄漏>>没有泄漏的空间。然而,takeWhileC问题对我来说是个谜。

相关问题