在pyspark中,我可以通过两个不同的步骤从一个kafka主题中读取流,并将(转换后的)数据写回另一个kafka主题。代码如下:
# Define Stream:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "instream") \
.load()
# Transform
matchdata = df.select(from_json(F.col("value").cast("string"),schema).alias("value"))\
.select(F.col('value').cast("string"))
# Stream the data, from a Kafka topic to a Spark in-memory table
query = matchdata \
.writeStream \
.format("memory") \
.queryName("PositionTable") \
.outputMode("append") \
.start()
query.awaitTermination(5)
# Create a new dataframe after stream completes:
tmp_df=spark.sql("select * from PositionTable")
# Write data to a different Kafka topic
tmp_df \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "outstream") \
.save()
上面的代码按预期工作:在pyspark中读取kafka主题“instream”中的数据,然后pyspark可以将数据写入kafka主题“outstream”。
但是,我希望读取流并立即将转换后的数据写回(流将是无限的,我们希望在数据进入时立即获得见解)。在文档之后,我将上面的查询替换为以下内容:
query = matchdata \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("topic", "outstream") \
.option("checkpointLocation", "/path/to/HDFS/dir") \
.start()
这似乎不起作用。没有错误信息,所以我不知道是什么错误。我也尝试过在windows中开窗和聚合,但也不起作用。任何建议都将不胜感激!
1条答案
按热度按时间yzuktlbb1#
好的,我发现了问题。主要原因是子目录“path/to/hdfs/dir”必须存在。创建该目录后,代码按预期运行。如果有一条错误消息沿着这些思路陈述一些东西,那就太好了。