pyspark-kafka结构化流媒体:写入时出错

a5g8bdjr  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(336)

在pyspark中,我可以通过两个不同的步骤从一个kafka主题中读取流,并将(转换后的)数据写回另一个kafka主题。代码如下:


# Define Stream:

df = spark \
     .readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", "localhost:9092") \
     .option("subscribe", "instream") \
     .load()

# Transform

matchdata = df.select(from_json(F.col("value").cast("string"),schema).alias("value"))\
          .select(F.col('value').cast("string"))

# Stream the data, from a Kafka topic to a Spark in-memory table

query = matchdata \
       .writeStream \
       .format("memory") \
       .queryName("PositionTable") \
       .outputMode("append") \
       .start()

query.awaitTermination(5)

# Create a new dataframe after stream completes:

tmp_df=spark.sql("select * from PositionTable")

# Write data to a different Kafka topic

tmp_df \
     .write \
     .format("kafka") \
     .option("kafka.bootstrap.servers", "localhost:9092") \
     .option("topic", "outstream") \
     .save()

上面的代码按预期工作:在pyspark中读取kafka主题“instream”中的数据,然后pyspark可以将数据写入kafka主题“outstream”。
但是,我希望读取流并立即将转换后的数据写回(流将是无限的,我们希望在数据进入时立即获得见解)。在文档之后,我将上面的查询替换为以下内容:

query = matchdata \
       .writeStream \
       .format("kafka") \
       .option("kafka.bootstrap.servers", "localhost:9092") \
       .option("topic", "outstream") \
       .option("checkpointLocation", "/path/to/HDFS/dir") \
       .start()

这似乎不起作用。没有错误信息,所以我不知道是什么错误。我也尝试过在windows中开窗和聚合,但也不起作用。任何建议都将不胜感激!

yzuktlbb

yzuktlbb1#

好的,我发现了问题。主要原因是子目录“path/to/hdfs/dir”必须存在。创建该目录后,代码按预期运行。如果有一条错误消息沿着这些思路陈述一些东西,那就太好了。

相关问题