我正在尝试通过spark2流处理一些数据并将它们保存到hdfs。在流运行时,我想通过thrift server通过简单的选择读取存储的数据:
SELECT COUNT(*) FROM stream_table UNION ALL SELECT COUNT(*) FROM thisistable;
但我有个例外
错误:org.apache.spark.sparkexception:由于阶段失败而中止作业:阶段5.0中的任务0失败了1次,最近的失败:阶段5.0中的任务0.0丢失(tid 6,localhost):java.lang.runtimeexception:hdfs://5b6b8bf723a2:9000/archivedata/parquets/efc44dd4-1792-4b6d-b0f2-120818047b1b不是parquet.hadoop.parquetfilereader.readfooter(parquetfilereader)处的Parquet文件(太小)。java:412)在parquet.hadoop.parquetfilereader.readfooter(parquetfilereader。java:385)在parquet.hadoop.parquetfilereader.readfooter(parquetfilereader。java:371)在org.apache.hadoop.hive.ql.io.parquet.read.parquetrecordreaderwrapper.getsplit(parquetrecordreaderwrapper。java:252)访问org.apache.hadoop.hive.ql.io.parquet.read.parquetrecordreaderwrapper.(parquetrecordreaderwrapper。java:99)在org.apache.hadoop.hive.ql.io.parquet.read.parquetrecordreaderwrapper.(parquetrecordreaderwrapper。java:85)在org.apache.hadoop.hive.ql.io.parquet.mapredparquetinputformat.getrecordreader(mapredparquetinputformat。java:72)在org.apache.spark.rdd.hadooprdd$$anon$1。scala:246)在org.apache.spark.rdd.hadooprdd.compute(hadooprdd。scala:102) 在org.apache.spark.rdd.rdd.iterator(rdd。scala:283)(rdd。scala:319)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:319)scala:38)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:38)在org.apache.spark.rdd.rdd.iterator(rdd。scala:283)(rdd。scala:319)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:319)scala:38)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:38)在org.apache.spark.rdd.rdd.iterator(rdd。scala:283)(rdd。scala:319)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:319)scala:38)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:79)在org.apache.spark.scheduler.task.run(task。scala:85)shufflemaptask公司。scala:47)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.lang.thread.run(线程。java:745)或$worker.run(threadpoolexecutor。java:617)
我的假设是spark将在批处理开始时创建一个空的parquet文件,并在批处理结束时填充它,我正在运行一个 select
通过存档文件,但其中一个是空的,因为实际批次尚未完成。
简单的Spark流示例(用于模拟转换延迟的thread.sleep)
spark
.readStream()
.schema(schema)
.json("/tmp")
.filter(x->{
Thread.sleep(1000);
return true;
})
.writeStream()
.format("parquet")
.queryName("thisistable")
.start()
.awaitTermination();
有没有一种方法可以让我避免这个异常,并且使用thrift server只获取完成的文件?
暂无答案!
目前还没有任何答案,快来回答吧!