Pyspark正在努力理解为什么writeStream不写入指定的文件

oxiaedzo  于 2023-08-02  发布在  Spark
关注(0)|答案(1)|浏览(91)
spark 3.3.2
python3.10

字符串
我已经阅读了here上的结构流文档,但我仍然无法让writeStream为我的用例将任何内容实际持久化到磁盘。我目前正在从一个Kafka主题中流式传输数据,做一些转换,并将其写入文件。这是我目前拥有的:

# read data from topic
df = spark \
     .readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", "my_broker:9092") \
     .option("subscribe", "my_topic") \
     .option("starting_offsets": "earliest") \
     .options("additional_options": {
                "maxOffsetsPerTrigger": 10000,
                "failOnDataLoss": "false"}) \
     .load()

# convert the byte array into a dict/Map object
df1 = df.selectExpr("cast(value as string) as jsonString").select(from_json(col("jsonString"), schema).alias("json"))

# write dataframe to disk
df1.writeStream \
   .foreachBatch(my_func)\
   .format("hudi")\
   .option("hoodie.table.name", "my_table")\
   .option("hoodie.datasource.write.precombine.field", "precombine_key")\
   .option("hoodie.datasource.write.recordkey.field", "record_key")\
   .option("path", "/path/to/output/")\
   .option("checkpointLocation", "/path/to/checkpoint/")\
   .trigger(processingTime="1 minutes")\
   .start()\
   .awaitTermination() 

def my_func(batch_df, batch_id):
    # process the JSON into tabular format and persist to disk
    processed_df = batch_df.select(
            col("json").getItem("payload").getItem("record_key").alias("id"),
            col("json").getItem("payload").getItem("precombine_key").alias("name"),
            explode(col("json").getItem("payload")).alias("keys")
        )
    return processed_df


当我运行这个,我没有看到任何写入到指定的目录/path/to/output/然而,如果我做了以下更改writeStream选项和my_func

# write dataframe to disk
df1.writeStream \
   .foreachBatch(my_func)\
   .trigger(processingTime="1 minutes")\
   .start()\
   .awaitTermination() 

def my_func(batch_df, batch_id):
    # process the JSON into tabular format and persist to disk
    processed_df = batch_df.select(
            col("json").getItem("payload").getItem("id").alias("id"),
            col("json").getItem("payload").getItem("name").alias("name"),
            explode(col("json").getItem("payload")).alias("keys")
        )
    processed_df.write\
                .format("hudi")\
                .option("hoodie.table.name", "my_table")\
                .option("hoodie.datasource.write.precombine.field", "precombine_key")\
                .option("hoodie.datasource.write.recordkey.field", "record_key")\
                .option("path", "/path/to/output/")\
                .option("checkpointLocation", "/path/to/checkpoint/")\
                .mode("append")\
                .save()


这是可行的。但本质上,我只是将批量的 Dataframe 写入path,而不是实际上“流式传输”数据。我有什么不明白的吗?在我的writeStream.foreachBatch中使用write感觉就像我正在挫败使用writeStream的目的

deyfvvtc

deyfvvtc1#

当使用foreachBatch时,写操作发生在函数内部。
我认为在你的情况下foreachBatch是不必要的:

def my_func(df):
    # process the JSON into tabular format and persist to disk
    processed_df = df.select(
        col("json").getItem("payload").getItem("record_key").alias("id"),
        col("json").getItem("payload").getItem("precombine_key").alias("name"),
        explode(col("json").getItem("payload")).alias("keys")
    )
    return processed_df

df2 = my_func(df1)

query_handle = (
    df2
    .writeStream
    .format("hudi")
    .option("hoodie.table.name", "my_table")
    .option("hoodie.datasource.write.precombine.field", "precombine_key")
    .option("hoodie.datasource.write.recordkey.field", "record_key")
    .option("path", "/path/to/output/")
    .option("checkpointLocation", "/path/to/checkpoint/")
    .trigger(processingTime="1 minutes")
    .start()
)
query_handle.awaitTermination()

字符串

相关问题