使用spark结构化流处理聚合批处理etl作业

m4pnthwp  于 2021-05-19  发布在  Spark
关注(0)|答案(0)|浏览(254)

我想设置spark etl批处理作业。我在网上从databricks上读到,使用spark流媒体 Trigger.Once 这是个好主意,因为它负责与批处理作业相关的簿记。
我要做的工作是把Parquet地板放在table上,然后执行 groupby 以及 count 在战场上。示例数据集是:

+-----------+------------+-----------------------+
|file_sha256|machine_guid|time                   |
+-----------+------------+-----------------------+
|1          |a           |2020-10-20 17:26:51.404|
|1          |a           |2020-10-20 17:26:51.404|
|1          |b           |2020-10-20 17:26:51.404|
|1          |null        |2020-10-20 17:26:51.404|
|1          |c           |2020-10-20 17:26:51.404|
|2          |a           |2020-10-20 17:26:51.404|
|2          |b           |2020-10-20 17:26:51.404|
|null       |a           |2020-10-20 17:26:51.404|
|null       |b           |2020-10-20 17:26:51.404|
|3          |null        |2020-10-20 17:26:51.404|
|null       |null        |2020-10-20 17:26:51.404|
|4          |e           |2020-10-20 17:26:51.404|
+-----------+------------+-----------------------+

root
 |-- file_sha256: string (nullable = true)
 |-- machine_guid: string (nullable = true)
 |-- time: timestamp (nullable = false)

使用常规的非流式spark,我想做的是:

df_agg = (df
          .groupby('file_sha256')
          .agg(F.count('file_sha256').alias('file_count')))

输出:

+-----------+----------+
|file_sha256|file_count|
+-----------+----------+
|       null|         0|
|          1|         5|
|          2|         2|
|          3|         1|
|          4|         1|
+-----------+----------+

我想使用spark流来实现这一点,但是在写入聚合数据时,它会不断出错。我得到的是:

df = (spark
      .readStream
      .schema(df_schema)
      .parquet('test_raw_data'))

数据集 test_raw_data 与上面显示的测试数据集完全相同

df_agg = (df
          .withWatermark('time', '1 seconds')
          .groupby('file_sha256')
          .agg(F.count('file_sha256').alias('file_count')))

(df_agg
 .writeStream
 .trigger(once=True)
 .option('checkpointLocation', 'checkpoint')
 .outputMode('append')
 .start(path='agg_stream'))

当我尝试此操作时,不断出现以下错误:

Py4JJavaError: An error occurred while calling o93.start.
: org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Aggregate [file_sha256#52], [file_sha256#52, count(file_sha256#52) AS file_count#71L]
+- EventTimeWatermark time#54: timestamp, interval 1 seconds
   +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7900ef5f,parquet,List(),Some(StructType(StructField(file_sha256,StringType,true), StructField(machine_guid,StringType,true), StructField(time,TimestampType,false))),List(),None,Map(path -> test_raw_data),None), FileSource[test_raw_data], [file_sha256#52, machine_guid#53, time#54]

我不知道为什么它会失败,因为它缺少在聚合中指定的水印。我在这里看到过其他的帖子,但似乎都不管用。
我的问题是,如何将此聚合作为流数据集来执行?
我试过用pyspark来回答这个问题,但我认为用哪种语言来回答这个问题并不重要。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题