如何处理spark结构化流媒体中的小文件问题?

5hcedyr0  于 2021-07-12  发布在  Spark
关注(0)|答案(3)|浏览(477)

在我的项目中有一个场景,我正在使用spark-sql-2.4.1版本阅读kafka主题消息。我可以使用结构化流媒体处理一天。一旦数据被接收和处理后,我需要将数据保存到各自的Parquet文件在hdfs商店。
我能够存储和读取Parquet文件,我保持了15秒到1分钟的触发时间。这些文件的大小非常小,因此会产生许多文件。
这些Parquet文件需要稍后由配置单元查询读取。
那么1)这个策略在生产环境中有效吗?还是会导致以后的小文件问题?
2) 处理/设计此类场景(即行业标准)的最佳实践是什么?
3) 这些事情在生产中通常是如何处理的?
谢谢您。

s71maibg

s71maibg1#

我知道这个问题太老了。我也遇到了类似的问题&我使用spark结构化流式查询监听器来解决这个问题。
我的用例是从kafka获取数据并用年、月、日和小时分区存储在hdfs中。
下面的代码将采取前一个小时的分区数据,应用重新分区和覆盖现有分区中的数据。

val session = SparkSession.builder().master("local[2]").enableHiveSupport().getOrCreate()
session.streams.addListener(AppListener(config,session))

class AppListener(config: Config,spark: SparkSession) extends StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
    this.synchronized {AppListener.mergeFiles(event.progress.timestamp,spark,config)}
  }
  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
}

object AppListener {

  def mergeFiles(currentTs: String,spark: SparkSession,config:Config):Unit = {
    val configs = config.kafka(config.key.get)
    if(currentTs.datetime.isAfter(Processed.ts.plusMinutes(5))) {

      println(
        s"""
           |Current Timestamp     :     ${currentTs}
           |Merge Files           :     ${Processed.ts.minusHours(1)}
           |
           |""".stripMargin)

      val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
      val ts = Processed.ts.minusHours(1)
      val hdfsPath = s"${configs.hdfsLocation}/year=${ts.getYear}/month=${ts.getMonthOfYear}/day=${ts.getDayOfMonth}/hour=${ts.getHourOfDay}"
      val path = new Path(hdfsPath)

      if(fs.exists(path)) {

      val hdfsFiles = fs.listLocatedStatus(path)
        .filter(lfs => lfs.isFile && !lfs.getPath.getName.contains("_SUCCESS"))
        .map(_.getPath).toList

      println(
        s"""
           |Total files in HDFS location  : ${hdfsFiles.length}
           | ${hdfsFiles.length > 1}
           |""".stripMargin)

      if(hdfsFiles.length > 1) {

          println(
            s"""
               |Merge Small Files
               |==============================================
               |HDFS Path             : ${hdfsPath}
               |Total Available files : ${hdfsFiles.length}
               |Status                : Running
               |
               |""".stripMargin)

          val df = spark.read.format(configs.writeFormat).load(hdfsPath).cache()
          df.repartition(1)
            .write
            .format(configs.writeFormat)
            .mode("overwrite")
            .save(s"/tmp${hdfsPath}")

          df.cache().unpersist()

        spark
          .read
          .format(configs.writeFormat)
          .load(s"/tmp${hdfsPath}")
          .write
          .format(configs.writeFormat)
          .mode("overwrite")
          .save(hdfsPath)

          Processed.ts = Processed.ts.plusHours(1).toDateTime("yyyy-MM-dd'T'HH:00:00")
          println(
            s"""
               |Merge Small Files
               |==============================================
               |HDFS Path             : ${hdfsPath}
               |Total files           : ${hdfsFiles.length}
               |Status                : Completed
               |
               |""".stripMargin)
        }
      }
    }
  }
  def apply(config: Config,spark: SparkSession): AppListener = new AppListener(config,spark)
}

object Processed {
  var ts: DateTime = DateTime.now(DateTimeZone.forID("UTC")).toDateTime("yyyy-MM-dd'T'HH:00:00")
}

有时数据是巨大的&我使用下面的逻辑将数据划分为多个文件。文件大小约为160MB

val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
    val dataSize = bytes.toLong
    val numPartitions = (bytes.toLong./(1024.0)./(1024.0)./(10240)).ceil.toInt

    df.repartition(if(numPartitions == 0) 1 else numPartitions)
      .[...]

编辑-1
使用这个-spark.sessionstate.executeplan(df.queryexecution.logical).optimizedplan.stats(spark.sessionstate.conf).sizeinbytes我们可以得到实际Dataframe加载到内存后的大小,例如,您可以检查下面的代码。

scala> val df = spark.read.format("orc").load("/tmp/srinivas/")
df: org.apache.spark.sql.DataFrame = [channelGrouping: string, clientId: string ... 75 more fields]

scala> import org.apache.commons.io.FileUtils
import org.apache.commons.io.FileUtils

scala> val bytes = spark.sessionState.executePlan(df.queryExecution.logical).optimizedPlan.stats(spark.sessionState.conf).sizeInBytes
bytes: BigInt = 763275709

scala> FileUtils.byteCountToDisplaySize(bytes.toLong)
res5: String = 727 MB

scala> import sys.process._
import sys.process._

scala> "hdfs dfs -ls -h /tmp/srinivas/".!
Found 2 items
-rw-r-----   3 svcmxns hdfs          0 2020-04-20 01:46 /tmp/srinivas/_SUCCESS
-rw-r-----   3 svcmxns hdfs    727.4 M 2020-04-20 01:46 /tmp/srinivas/part-00000-9d0b72ea-f617-4092-ae27-d36400c17917-c000.snappy.orc
res6: Int = 0
xzlaal3s

xzlaal3s2#

我们也有类似的问题。在google搜索了很多次之后,似乎普遍接受的方法是编写另一个作业,该作业经常聚合许多小文件,然后将它们写入更大的合并文件中。这就是我们现在要做的。
顺便说一句:不管怎样,您在这里可以做的事情是有限制的,因为并行度越高,文件的数量就越多,因为每个执行器线程都会写入自己的文件。它们从不写入共享文件。这似乎就是并行处理的本质。

7uhlpewt

7uhlpewt3#

这是一个常见的燃烧问题的Spark流没有任何固定的答案。我采取了一种基于附加思想的非传统方法。当您使用spark 2.4.1时,此解决方案将非常有用。
因此,如果append支持parquet或orc这样的柱状文件格式,那么就更容易了,因为新数据可以被追加到同一个文件中,并且在每个微批处理之后,文件大小会越来越大。但是,由于它不受支持,我采用了版本控制方法来实现这一点。在每个微批处理之后,用一个版本分区生成数据。例如

/prod/mobility/cdr_data/date=01–01–2010/version=12345/file1.parquet
/prod/mobility/cdr_data/date=01–01–2010/version=23456/file1.parquet

我们可以做的是,在每个微批中,读取旧版本数据,将其与新的流数据合并,然后在与新版本相同的路径上再次写入。然后,删除旧版本。这样,在每个微批处理之后,每个分区都会有一个版本和一个文件。每个分区中的文件大小将不断增大。
由于不允许流数据集和静态数据集合并,我们可以使用foreachbatch sink(在spark>=2.4.0中提供)将流数据集转换为静态数据集。
我在链接中描述了如何以最佳方式实现这一点。你可能想看看。https://medium.com/@kumar.rahul.nitk/solving-small-file-problem-in-spark-structured-streaming-a-versioning-approach-73a0153a0a小文件

相关问题