我想设置spark etl批处理作业。我在网上从databricks上读到,使用spark流媒体 Trigger.Once
这是个好主意,因为它负责与批处理作业相关的簿记。
我要做的工作是把Parquet地板放在table上,然后执行 groupby
以及 count
在战场上。示例数据集是:
+-----------+------------+-----------------------+
|file_sha256|machine_guid|time |
+-----------+------------+-----------------------+
|1 |a |2020-10-20 17:26:51.404|
|1 |a |2020-10-20 17:26:51.404|
|1 |b |2020-10-20 17:26:51.404|
|1 |null |2020-10-20 17:26:51.404|
|1 |c |2020-10-20 17:26:51.404|
|2 |a |2020-10-20 17:26:51.404|
|2 |b |2020-10-20 17:26:51.404|
|null |a |2020-10-20 17:26:51.404|
|null |b |2020-10-20 17:26:51.404|
|3 |null |2020-10-20 17:26:51.404|
|null |null |2020-10-20 17:26:51.404|
|4 |e |2020-10-20 17:26:51.404|
+-----------+------------+-----------------------+
root
|-- file_sha256: string (nullable = true)
|-- machine_guid: string (nullable = true)
|-- time: timestamp (nullable = false)
使用常规的非流式spark,我想做的是:
df_agg = (df
.groupby('file_sha256')
.agg(F.count('file_sha256').alias('file_count')))
输出:
+-----------+----------+
|file_sha256|file_count|
+-----------+----------+
| null| 0|
| 1| 5|
| 2| 2|
| 3| 1|
| 4| 1|
+-----------+----------+
我想使用spark流来实现这一点,但是在写入聚合数据时,它会不断出错。我得到的是:
df = (spark
.readStream
.schema(df_schema)
.parquet('test_raw_data'))
数据集 test_raw_data
与上面显示的测试数据集完全相同
df_agg = (df
.withWatermark('time', '1 seconds')
.groupby('file_sha256')
.agg(F.count('file_sha256').alias('file_count')))
(df_agg
.writeStream
.trigger(once=True)
.option('checkpointLocation', 'checkpoint')
.outputMode('append')
.start(path='agg_stream'))
当我尝试此操作时,不断出现以下错误:
Py4JJavaError: An error occurred while calling o93.start.
: org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Aggregate [file_sha256#52], [file_sha256#52, count(file_sha256#52) AS file_count#71L]
+- EventTimeWatermark time#54: timestamp, interval 1 seconds
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@7900ef5f,parquet,List(),Some(StructType(StructField(file_sha256,StringType,true), StructField(machine_guid,StringType,true), StructField(time,TimestampType,false))),List(),None,Map(path -> test_raw_data),None), FileSource[test_raw_data], [file_sha256#52, machine_guid#53, time#54]
我不知道为什么它会失败,因为它缺少在聚合中指定的水印。我在这里看到过其他的帖子,但似乎都不管用。
我的问题是,如何将此聚合作为流数据集来执行?
我试过用pyspark来回答这个问题,但我认为用哪种语言来回答这个问题并不重要。
暂无答案!
目前还没有任何答案,快来回答吧!