spark 3.3.2
python3.10
字符串
我已经阅读了here上的结构流文档,但我仍然无法让writeStream
为我的用例将任何内容实际持久化到磁盘。我目前正在从一个Kafka主题中流式传输数据,做一些转换,并将其写入文件。这是我目前拥有的:
# read data from topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "my_broker:9092") \
.option("subscribe", "my_topic") \
.option("starting_offsets": "earliest") \
.options("additional_options": {
"maxOffsetsPerTrigger": 10000,
"failOnDataLoss": "false"}) \
.load()
# convert the byte array into a dict/Map object
df1 = df.selectExpr("cast(value as string) as jsonString").select(from_json(col("jsonString"), schema).alias("json"))
# write dataframe to disk
df1.writeStream \
.foreachBatch(my_func)\
.format("hudi")\
.option("hoodie.table.name", "my_table")\
.option("hoodie.datasource.write.precombine.field", "precombine_key")\
.option("hoodie.datasource.write.recordkey.field", "record_key")\
.option("path", "/path/to/output/")\
.option("checkpointLocation", "/path/to/checkpoint/")\
.trigger(processingTime="1 minutes")\
.start()\
.awaitTermination()
def my_func(batch_df, batch_id):
# process the JSON into tabular format and persist to disk
processed_df = batch_df.select(
col("json").getItem("payload").getItem("record_key").alias("id"),
col("json").getItem("payload").getItem("precombine_key").alias("name"),
explode(col("json").getItem("payload")).alias("keys")
)
return processed_df
型
当我运行这个,我没有看到任何写入到指定的目录/path/to/output/
然而,如果我做了以下更改writeStream
选项和my_func
:
# write dataframe to disk
df1.writeStream \
.foreachBatch(my_func)\
.trigger(processingTime="1 minutes")\
.start()\
.awaitTermination()
def my_func(batch_df, batch_id):
# process the JSON into tabular format and persist to disk
processed_df = batch_df.select(
col("json").getItem("payload").getItem("id").alias("id"),
col("json").getItem("payload").getItem("name").alias("name"),
explode(col("json").getItem("payload")).alias("keys")
)
processed_df.write\
.format("hudi")\
.option("hoodie.table.name", "my_table")\
.option("hoodie.datasource.write.precombine.field", "precombine_key")\
.option("hoodie.datasource.write.recordkey.field", "record_key")\
.option("path", "/path/to/output/")\
.option("checkpointLocation", "/path/to/checkpoint/")\
.mode("append")\
.save()
型
这是可行的。但本质上,我只是将批量的 Dataframe 写入path
,而不是实际上“流式传输”数据。我有什么不明白的吗?在我的writeStream.foreachBatch
中使用write
感觉就像我正在挫败使用writeStream
的目的
1条答案
按热度按时间deyfvvtc1#
当使用
foreachBatch
时,写操作发生在函数内部。我认为在你的情况下
foreachBatch
是不必要的:字符串