我的Dataframe df
看起来像
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
我为流式查询创建了一个kafka接收器,但没有收到kafka的任何消息。为什么?
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
1条答案
按热度按时间rt4zxlrg1#
你将不会从Kafka那里收到任何东西,因为根据你的代码,你正在尝试选择列
key
以及value
从只有列的Dataframeage
以及name
. 您需要选择如下所示。而且,你不需要
writeStream
如果Dataframe是静态的。那样的话,你需要申请write
以及save
.如果要将数据存储到json字符串中,可以应用以下命令“