我正在使用HoodieDeltaStreamer连接Kafka并将数据存储到帽衫表Hudi版本:0.10.1Spark:3.2.4 Hadoop:3.3.5
只有一个spark-submit作业正在运行
命令:
spark-submit --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer "file://$HUDI_HOME/docker/hoodie/hadoop/hive_base/target/hoodie-utilities.jar" --continuous --table-type COPY_ON_WRITE --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --source-ordering-field submit_date --target-base-path "hdfs://172.16.0.132:9000/data-lake/raw-zone/tables/temp_cow" --target-table temp_cow --props "file://$HUDI_HOME/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source-Table_May_live_v3.properties" --schemaprovider-class org.apache.hudi.utilities.schema.SchemaRegistryProvider --source-limit 50000 > /data/data-lake/logs/pull_from_kafka_topic1.log 2>&1 &
字符串
StackTrace:
2023-06-27 12:23:37,238 INFO sources.AvroKafkaSource: About to read 0 from Kafka for topic :Table_May_live_v3
2023-06-27 12:23:37,238 INFO deltastreamer.DeltaSync: No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(Option{val=Table_May_live_v3,0:200000}). New Checkpoint=(TempLMSTable_May_live_v3,0:200000)
2023-06-27 12:23:37,240 ERROR deltastreamer.HoodieDeltaStreamer: Shutting down delta-sync due to exception
java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:494)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1729)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1752)
at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1749)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1764)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.refreshTimeline(DeltaSync.java:244)
at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:288)
at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer$DeltaSyncService.lambda$startService$0(HoodieDeltaStreamer.java:640)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
型
1条答案
按热度按时间eni9jsuy1#
“
Filesystem closed
”通常意味着应用程序正在使用的Hadoop FileSystem示例在应用程序仍在尝试使用它时被关闭。您可以在
apache/hudi
issue 227中看到此错误的示例我猜还有其他代码调用fs.close()。fs客户端在整个jvm中共享
我把spark的核心改为1,错误消失了。请问为什么不能把核心设置为1以上?
在#254中跟踪问题。基本上,这是一场如何在内核数> 2的线程之间关闭文件系统对象的竞赛。
这个特殊的错误应该用PR 620,Hudi 0.5.2+修复。
spark应用程序的一般做法是根本不调用
fs.close()
,Hudi应该没有什么不同。然而,正如@bvaradar向我指出的那样,为了以防万一,可能值得调用super.close()
。一般来说,尽量避免在线程之间共享
FileSystem
示例:如果可能,每个线程都应该有自己的FileSystem
示例。这有助于避免并发问题。如果您的应用程序检测到
FileSystem
示例已关闭(例如,通过捕获IOException
),它可以尝试重新创建它。如果您使用的是Java,请考虑使用try-with-resources或类似的结构来确保资源得到正确清理。