我建立了一个管道,从kafka读取数据,使用spark结构化流处理数据,然后将Parquet文件写入hdfs。数据查询的下游客户端正在使用配置为将数据作为配置单元表读取的presto。 Kafka --> Spark --> Parquet on HDFS --> Presto
一般来说,这是可行的。当spark作业运行批处理时发生查询时,就会出现问题。spark作业在hdfs上创建一个零长度的parquet文件。如果presto在处理查询的过程中试图打开此文件,则会抛出一个错误: Query 20171116_170937_07282_489cc failed: Error opening Hive split hdfs://namenode:50071/hive/warehouse/table/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet (offset=0, length=0): hdfs://namenode:50071/hive/warehouse/table/part-00000-5a7c242a-3e53-46d0-9ee4-5d004ef4b1e4-c000.snappy.parquet is not a Parquet file (too small)
文件此时确实是零字节,因此错误是完全正确的,但这不是我想要的管道行为。我想能够不断地写在适当的hdfs文件夹,而不干扰presto查询。
作业的spark scala代码如下所示:
val FilesOnDisk = 1
Spark
.initKafkaStream("fleet_profile_test")
.filter(_.name.contains(job.kafkaTag))
.flatMap(job.parser)
.coalesce(FilesOnDisk)
.writeStream
.trigger(ProcessingTime("1 hours"))
.outputMode("append")
.queryName(job.queryName)
.format("parquet")
.option("path", job.outputFilesPath)
.start()
工作在凌晨两点开始。该文件第一次在hdfs上以零长度文件的形式出现在:05。在作业完成之前,直到在:21时将其完全写入,它才会更新。这使得该表在25%的时间内无法使用。
每个文件只有500kb多一点,所以我不希望文件的物理写入花费很长时间。据我所知,parquet文件的元数据位于文件末尾,因此编写更大文件的人会遇到更大的麻烦。
人们在处理presto错误时使用了哪些策略来集成spark结构化流媒体和presto?
1条答案
按热度按时间sz81bmfz1#
您可以尝试说服presto(或presto团队)忽略空文件,但这没有帮助,因为编写该文件的程序(这里是spark)最终将刷新部分数据,并且该文件将显示为部分、非空且格式不正确,因此也会导致错误。
防止presto(或其他读取表数据的程序)看到部分文件的方法是在不同的位置组装文件,然后原子地将文件移动到正确的位置。