我有一个小Spark簇,有一个主Spark簇和两个工作Spark簇。我有一个Kafka流应用程序,从Kafka流数据,并写在Parquet格式和附加模式的目录。
到目前为止,我能够读取Kafka流并使用下面的关键行将其写入Parquet文件。
val streamingQuery = mydf.writeStream.format("parquet").option("path", "/root/Desktop/sampleDir/myParquet").outputMode(OutputMode.Append).option("checkpointLocation", "/root/Desktop/sampleDir/myCheckPoint").start()
我把两个工人都登记了。有3-4个snappyParquet文件已创建,文件名的前缀为 part-00006-XXX.snappy.parquet
.
但当我尝试使用以下命令读取此Parquet文件时:
val dfP = sqlContext.read.parquet("/root/Desktop/sampleDir/myParquet")
它显示了一些ParquetParquet分割文件的“找不到文件”异常。奇怪的是,这些文件已经存在于其中一个工作节点中。
当在日志中进一步检查时,发现spark正在尝试从一个worker节点获取所有的parquet文件,并且由于不是所有的parquet文件都存在于一个worker中,因此它正在命中,只是在提到的parquet路径中找不到这些文件。
在流式查询或读取数据时是否遗漏了一些关键步骤?
注意:我没有hadoop基础设施。我只想使用文件系统。
1条答案
按热度按时间k4ymrczo1#
你需要一个共享文件系统。
spark假设从所有节点(驱动程序和工作程序)都可以看到相同的文件系统。如果您使用的是基本文件系统,那么每个节点都会看到自己的文件系统,这与其他节点的文件系统不同。
hdfs是获得公共共享文件系统的一种方法,另一种方法是使用公共nfs挂载(即,将同一远程文件系统从所有节点挂载到同一路径)。其他共享文件系统也存在。