scala 在spark中,有没有一种好的方法可以用一个变化的表来加入流?

o0lyfsai  于 2023-03-12  发布在  Scala
关注(0)|答案(2)|浏览(162)

我们的Spark环境:数据块4.2(包括Apache Spark 2.3.1、Scala 2.11)
我们努力实现的目标:我们想用一些定期更新的参考数据来丰富流数据,丰富是通过加入流和参考数据来完成的。
我们实施的内容:我们实现了两个spark job(jar):第一个是每小时更新一个Spark表TEST_TABLE(我们称之为“reference data”),方法是使用

<dataset>.write.mode(SaveMode.Overwrite).saveAsTable("TEST_TABLE")

然后调用spark.catalog.refreshTable("TEST_TABLE")
第二份工作(我们称之为流式数据)使用Spark Structured Streaming流式阅读一些数据,使用DataFrame.transform()将其与表TEST_TABLE连接,并将其写入另一个系统。我们在.transform()调用的函数中使用spark.read.table(“TEST_TABLE”)读取引用数据,以便获得表中的最新值。每次第一个应用程序更新表时,第二个应用程序都会崩溃。Log4j输出中显示以下消息:

18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
  at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748

我们还尝试在读取表之前该高速缓存无效,但这会降低性能,并且应用程序仍然崩溃。我们怀疑根本原因是对引用数据集的懒惰评估(它仍然“指向”旧数据,而旧数据已经不存在了)。
您有什么建议吗?我们可以做些什么来防止这个问题,或者使用动态引用数据加入流的最佳方法是什么?

h4cxqtbf

h4cxqtbf1#

连接到参考数据;不要缓存它,这可以确保您转到源。查找由主键+计数器表示的最新版本数据,其中此计数器最接近或等于您在流应用程序中维护的计数器。每写入一小时,再次追加所有仍为当前的引用数据,但计数器递增;也就是说是新版本这里用 parquet 。

dgiusagp

dgiusagp2#

而不是连接表和流。你可以利用spark 2.3.1中的一个新特性,即连接两个流数据。创建一个流而不是一个带水印的表。

Watermarks: Watermarking in Structured Streaming is a way to limit state in all 
stateful streaming operations by specifying how much late data to consider. 
Specifically, a watermark is a moving threshold in event-time that trails behind the 
maximum event-time seen by the query in the processed data. The trailing gap (aka 
watermark delay) defines how long should the engine wait for late data to arrive and 
is specified in the query using withWatermark.

Refer databricks blog

相关问题