在我的项目中有一个场景,我正在使用spark-sql-2.4.1版本阅读kafka主题消息。我可以使用结构化流媒体处理一天。一旦数据被接收和处理后,我需要将数据保存到各自的Parquet文件在hdfs商店。
我能够存储和读取Parquet文件,我保持了15秒到1分钟的触发时间。这些文件的大小非常小,因此会产生许多文件。
这些Parquet文件需要稍后由配置单元查询读取。
那么1)这个策略在生产环境中有效吗?还是会导致以后的小文件问题?
2) 处理/设计此类场景(即行业标准)的最佳实践是什么?
3) 这些事情在生产中通常是如何处理的?
谢谢您。
3条答案
按热度按时间s71maibg1#
我知道这个问题太老了。我也遇到了类似的问题&我使用spark结构化流式查询监听器来解决这个问题。
我的用例是从kafka获取数据并用年、月、日和小时分区存储在hdfs中。
下面的代码将采取前一个小时的分区数据,应用重新分区和覆盖现有分区中的数据。
有时数据是巨大的&我使用下面的逻辑将数据划分为多个文件。文件大小约为160MB
编辑-1
使用这个-spark.sessionstate.executeplan(df.queryexecution.logical).optimizedplan.stats(spark.sessionstate.conf).sizeinbytes我们可以得到实际Dataframe加载到内存后的大小,例如,您可以检查下面的代码。
xzlaal3s2#
我们也有类似的问题。在google搜索了很多次之后,似乎普遍接受的方法是编写另一个作业,该作业经常聚合许多小文件,然后将它们写入更大的合并文件中。这就是我们现在要做的。
顺便说一句:不管怎样,您在这里可以做的事情是有限制的,因为并行度越高,文件的数量就越多,因为每个执行器线程都会写入自己的文件。它们从不写入共享文件。这似乎就是并行处理的本质。
7uhlpewt3#
这是一个常见的燃烧问题的Spark流没有任何固定的答案。我采取了一种基于附加思想的非传统方法。当您使用spark 2.4.1时,此解决方案将非常有用。
因此,如果append支持parquet或orc这样的柱状文件格式,那么就更容易了,因为新数据可以被追加到同一个文件中,并且在每个微批处理之后,文件大小会越来越大。但是,由于它不受支持,我采用了版本控制方法来实现这一点。在每个微批处理之后,用一个版本分区生成数据。例如
我们可以做的是,在每个微批中,读取旧版本数据,将其与新的流数据合并,然后在与新版本相同的路径上再次写入。然后,删除旧版本。这样,在每个微批处理之后,每个分区都会有一个版本和一个文件。每个分区中的文件大小将不断增大。
由于不允许流数据集和静态数据集合并,我们可以使用foreachbatch sink(在spark>=2.4.0中提供)将流数据集转换为静态数据集。
我在链接中描述了如何以最佳方式实现这一点。你可能想看看。https://medium.com/@kumar.rahul.nitk/solving-small-file-problem-in-spark-structured-streaming-a-versioning-approach-73a0153a0a小文件