使用spark acid writestream(带检查点)引发org.apache.hadoop.fs.filealreadyexistsexception的spark结构化流

kfgdxczn  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(540)

在我们的spark应用程序中,我们使用 Spark structured streaming . 它使用 Kafka as input stream , & HiveAcid as writeStream 到Hive表。为了 HiveAcid ,它是一个名为 spark acidqubole : https://github.com/qubole/spark-acid
以下是我们的代码:

import za.co.absa.abris.avro.functions.from_confluent_avro
....

val spark = SparkSession
  .builder()
  .appName("events")
  .config("spark.sql.streaming.metricsEnabled", true)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._

val input_stream_df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("startingOffsets", '{"events":{"0":2310384922,"1":2280420020,"2":2278027233,"3":2283047819,"4":2285647440}}')
  .option("maxOffsetsPerTrigger", 10000)
  .option("subscribe", "events")
  .load()

// schema registry config
val srConfig = Map(
  "schema.registry.url"           -> "http://schema-registry:8081",
  "value.schema.naming.strategy"  -> "topic.name",
  "schema.registry.topic"         -> "events",
  "value.schema.id"               -> "latest"
)

val data = input_stream_df
  .withColumn("value", from_confluent_avro(col("value"), srConfig))
  .withColumn("timestamp_s", from_unixtime($"value.timestamp" / 1000))
  .select(
    $"value.*",
    year($"timestamp_s")       as 'year,
    month($"timestamp_s")      as 'month,
    dayofmonth($"timestamp_s") as 'day
  )

// format "HiveAcid" is provided by spark-acid lib from Qubole
val output_stream_df = data.writeStream.format("HiveAcid")
  .queryName("hiveSink")
  .option("database", "default")
  .option("table", "events_sink")
  .option("checkpointLocation", "/user/spark/events/checkpoint")
  .option("spark.acid.streaming.log.metadataDir", "/user/spark/events/checkpoint/spark-acid")
  .option("metastoreUri", "thrift://hive-metastore:9083")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

output_stream_df.awaitTermination()

我们可以将该应用程序部署到生产环境中,并多次(大约10次)重新部署它而不会出现问题。然后它遇到了以下错误:
查询hivesink[id=080a9f25-23d2-4ec8-a8c0-1634398d6d29,runid=990d3bba-0f7f-4bae-9f41-b43db6d1aeb3]终止,异常:由于阶段失败而中止作业:阶段0.0中的任务3失败4次,最近的失败:阶段0.0中的任务3.3丢失(tid 42,10.236.7.228,executor 3):org.apache.hadoop.fs.filealreadyexistException:/warehouse/tablespace/managed/hive/events/year=2020/month=5/day=18/delta_/bucket_forclient 10.236.7.228已经存在(…),位于com.qubole.shaded.orc.impl.physicalfswriter.(physicalfswriter)。java:95)在com.qubole.shaded.orc.impl.writerimpl.(writerimpl。java:177)在com.qubole.shaded.hadoop.hive.ql.io.orc.writerimpl.(writerimpl。java:94)在com.qubole.shaded.hadoop.hive.ql.io.orc.orcfile.createwriter(orcfile。java:334)在com.qubole.shaded.hadoop.hive.ql.io.orc.orcrecordupdater.initwriter(orcrecordupdater。java:602)在com.qubole.shaded.hadoop.hive.ql.io.orc.orcrecordupdater.addsimpleevent(orcrecordupdater。java:423)在com.qubole.shaded.hadoop.hive.ql.io.orc.orcrecordupdater.addsplitupdateevent(orcrecordupdater。java:432)在com.qubole.shaded.hadoop.hive.ql.io.orc.orcrecordupdater.insert(orcrecordupdater。java:484)在com.qubole.spark.hiveacid.writer.hive.hiveacidfullacidwriter.process(hiveacidwriter。scala:295)在com.qubole.spark.hiveacid.writer.tablewriter$$anon$1$$anonfun$6.apply(tablewriter。scala:153)在com.qubole.spark.hiveacid.writer.tablewriter$$anon$1$$anonfun$6.apply(tablewriter。scala:153)(…)在com.qubole.spark.hiveacid.writer.tablewriter$$anon$1.apply(tablewriter。scala:153)在com.qubole.spark.hiveacid.writer.tablewriter$$anon$1.apply(tablewriter。scala:139)
每次重新启动应用程序时,都会显示不同的 delta + bucket files 已存在错误。但是,这些文件在每次启动时都是新创建的(最有可能),但是不知道为什么会抛出错误。
任何指针都将不胜感激。

wwwo4jvm

wwwo4jvm1#

我从工人的错误日志中找到了真正的根本原因。这是由于我在使用的一个库中所做的代码更改导致的 out of memory 问题。
我之前发布的是来自驱动程序的错误日志,在worker节点上多次失败之后。

相关问题