streamwriter与旧服务器

sqxo8psd  于 2021-06-28  发布在  Hive
关注(0)|答案(0)|浏览(254)

我正在尝试通过spark2流处理一些数据并将它们保存到hdfs。在流运行时,我想通过thrift server通过简单的选择读取存储的数据:

SELECT COUNT(*) FROM stream_table UNION ALL SELECT COUNT(*) FROM thisistable;

但我有个例外
错误:org.apache.spark.sparkexception:由于阶段失败而中止作业:阶段5.0中的任务0失败了1次,最近的失败:阶段5.0中的任务0.0丢失(tid 6,localhost):java.lang.runtimeexception:hdfs://5b6b8bf723a2:9000/archivedata/parquets/efc44dd4-1792-4b6d-b0f2-120818047b1b不是parquet.hadoop.parquetfilereader.readfooter(parquetfilereader)处的Parquet文件(太小)。java:412)在parquet.hadoop.parquetfilereader.readfooter(parquetfilereader。java:385)在parquet.hadoop.parquetfilereader.readfooter(parquetfilereader。java:371)在org.apache.hadoop.hive.ql.io.parquet.read.parquetrecordreaderwrapper.getsplit(parquetrecordreaderwrapper。java:252)访问org.apache.hadoop.hive.ql.io.parquet.read.parquetrecordreaderwrapper.(parquetrecordreaderwrapper。java:99)在org.apache.hadoop.hive.ql.io.parquet.read.parquetrecordreaderwrapper.(parquetrecordreaderwrapper。java:85)在org.apache.hadoop.hive.ql.io.parquet.mapredparquetinputformat.getrecordreader(mapredparquetinputformat。java:72)在org.apache.spark.rdd.hadooprdd$$anon$1。scala:246)在org.apache.spark.rdd.hadooprdd.compute(hadooprdd。scala:102) 在org.apache.spark.rdd.rdd.iterator(rdd。scala:283)(rdd。scala:319)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:319)scala:38)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:38)在org.apache.spark.rdd.rdd.iterator(rdd。scala:283)(rdd。scala:319)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:319)scala:38)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:38)在org.apache.spark.rdd.rdd.iterator(rdd。scala:283)(rdd。scala:319)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:319)scala:38)在org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask。scala:79)在org.apache.spark.scheduler.task.run(task。scala:85)shufflemaptask公司。scala:47)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1142)在java.lang.thread.run(线程。java:745)或$worker.run(threadpoolexecutor。java:617)
我的假设是spark将在批处理开始时创建一个空的parquet文件,并在批处理结束时填充它,我正在运行一个 select 通过存档文件,但其中一个是空的,因为实际批次尚未完成。
简单的Spark流示例(用于模拟转换延迟的thread.sleep)

spark
    .readStream()
    .schema(schema)
    .json("/tmp")
    .filter(x->{
        Thread.sleep(1000);
        return true;
    })
    .writeStream()
    .format("parquet")
    .queryName("thisistable")
    .start()
    .awaitTermination();

有没有一种方法可以让我避免这个异常,并且使用thrift server只获取完成的文件?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题