如何处理spark结构化流媒体中的小文件问题?

z2acfund  于 2023-02-24  发布在  Apache
关注(0)|答案(4)|浏览(155)

我的项目中有一个场景,我正在使用spark-sql-2.4.1版本阅读Kafka主题信息,我可以使用结构化流处理一天的工作,一旦数据被接收,处理后我需要将数据保存到hdfs存储中的相应parquet文件中。
我能够存储和读取 parquet 文件,我保持了15秒到1分钟的触发时间。这些文件的大小非常小,因此导致许多文件。
这些Parquet文件需要稍后由hive查询读取。
那么1)这个策略在生产环境中有效吗?或者它以后会导致任何小文件问题吗?
2)处理/设计此类场景的最佳实践(即行业标准)是什么?
3)这类事情在生产中一般是如何处理的?
谢谢你。

pftdvrlh

pftdvrlh1#

我知道这个问题太老了。我也有过类似的问题&我已经使用Spark结构化流查询侦听器来解决这个问题。
我的用例是从Kafka中提取数据,并存储在带有年、月、日和小时分区的hdfs中。
下面的代码将采取前一个小时的分区数据,应用重新分区和覆盖现有分区中的数据.

val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()
session.streams.addListener(AppListener(config,session))

class AppListener(config: Config,spark: SparkSession) extends StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    this.synchronized {AppListener.mergeFiles(event.progress.timestamp,spark,config)}
  }
  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}

object AppListener {

  def mergeFiles(currentTs: String,spark: SparkSession,config:Config):Unit = {
    val configs = config.kafka(config.key.get)
    if(currentTs.datetime.isAfter(Processed.ts.plusMinutes(5))) {

      println(
        s"""
           |Current Timestamp     :     ${currentTs}
           |Merge Files           :     ${Processed.ts.minusHours(1)}
           |
           |""".stripMargin)

      val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
      val ts = Processed.ts.minusHours(1)
      val hdfsPath = s"${configs.hdfsLocation}/year=${ts.getYear}/month=${ts.getMonthOfYear}/day=${ts.getDayOfMonth}/hour=${ts.getHourOfDay}"
      val path = new Path(hdfsPath)

      if(fs.exists(path)) {

      val hdfsFiles = fs.listLocatedStatus(path)
        .filter(lfs => lfs.isFile && !lfs.getPath.getName.contains("_SUCCESS"))
        .map(_.getPath).toList

      println(
        s"""
           |Total files in HDFS location  : ${hdfsFiles.length}
           | ${hdfsFiles.length > 1}
           |""".stripMargin)

      if(hdfsFiles.length > 1) {

          println(
            s"""
               |Merge Small Files
               |==============================================
               |HDFS Path             : ${hdfsPath}
               |Total Available files : ${hdfsFiles.length}
               |Status                : Running
               |
               |""".stripMargin)

          val df = spark.read.format(configs.writeFormat).load(hdfsPath).cache()
          df.repartition(1)
            .write
            .format(configs.writeFormat)
            .mode("overwrite")
            .save(s"/tmp${hdfsPath}")

          df.cache().unpersist()

        spark
          .read
          .format(configs.writeFormat)
          .load(s"/tmp${hdfsPath}")
          .write
          .format(configs.writeFormat)
          .mode("overwrite")
          .save(hdfsPath)

          Processed.ts = Processed.ts.plusHours(1).toDateTime("yyyy-MM-dd'T'HH:00:00")
          println(
            s"""
               |Merge Small Files
               |==============================================
               |HDFS Path             : ${hdfsPath}
               |Total files           : ${hdfsFiles.length}
               |Status                : Completed
               |
               |""".stripMargin)
        }
      }
    }
  }
  def apply(config: Config,spark: SparkSession): AppListener = new AppListener(config,spark)
}

object Processed {
  var ts: DateTime = DateTime.now(DateTimeZone.forID("UTC")).toDateTime("yyyy-MM-dd'T'HH:00:00")
}

有时数据是巨大的,我已经将数据分为多个文件使用以下逻辑。文件大小将约160 MB

val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
    val dataSize = bytes.toLong
    val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt

    df.repartition(if(numPartitions == 0) 1 else numPartitions)
      .[...]

编辑-1
使用这个- spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes,我们可以在 Dataframe 加载到内存中后获得实际 Dataframe 的大小,例如,您可以检查下面的代码。

scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]

scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils

scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709

scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB

scala> import sys.process._
import sys.process._

scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r-----   3 svcmxns hdfs          0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r-----   3 svcmxns hdfs    727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0
u4vypkhs

u4vypkhs2#

我们也遇到过类似的问题。在谷歌上搜索了很多次之后,似乎普遍接受的方法是编写另一个作业,该作业经常聚合许多小文件,并将它们写入其他更大的合并文件中。这就是我们现在所做的。
顺便说一句:无论如何,你在这里所能做的是有限制的,因为你拥有的并行性越多,文件的数量就越大,因为每个执行器线程都写它自己的文件。2它们从不写一个共享文件。3这似乎是并行处理的本质。

kfgdxczn

kfgdxczn3#

这是一个关于spark流的常见问题,没有固定的答案。我采用了一种基于附加思想的非传统方法。当你使用spark 2.4.1时,这个解决方案会很有帮助。
因此,如果附加是支持在列文件格式,如Parquet或orc,它本来只是更容易,因为新的数据可以附加在同一个文件和文件大小可以得到越来越大后,每一个微批处理。然而,因为它不支持,我采取了版本化的方法来实现这一点。在每一个微批处理后,数据产生的版本分区。

/prod/mobility/cdr_data/date=01–01–2010/version=12345/file1.parquet
/prod/mobility/cdr_data/date=01–01–2010/version=23456/file1.parquet

我们可以做的是,在每一个微批中,读取旧版本数据,与新的流数据合并,在与新版本相同的路径上重新写入,然后删除旧版本,这样每一个微批之后,每个分区中就只有一个版本和一个文件,每个分区中的文件大小会不断增长,变得越来越大。
由于不允许流数据集和静态数据集合并,我们可以使用forEachBatch sink(spark〉=2.4.0中提供)将流数据集转换为静态数据集。
我已经在链接中描述了如何最佳地实现这一点。您可能想看一下。https://medium.com/@kumar.rahul.nitk/solving-small-file-problem-in-spark-structured-streaming-a-versioning-approach-73a0153a0a

5t7ly7z5

5t7ly7z54#

你可以设置触发器。

df.writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

触发器大小越大,文件大小越大。或者,您可以选择使用调度程序(例如,Airflow)和触发器Trigger.Once()或更好的Trigger.AvailableNow()运行作业。它一个周期只运行作业一次,并使用适当的文件大小处理所有数据。

相关问题