我正在开发结构化的sparkstream应用程序,经过分析,我最终将输出写入kafka。Dataframe的架构是
root
|-- key: string (nullable = true)
|-- value: string (nullable = true)
写给Kafka的代码是
query = testDataFrame \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "127.0.0.1:9092") \
.option("topic", output_topic) \
.option("checkpointLocation", ./tmp") \
.start()
执行此代码后,我没有收到任何错误,不幸的是,当我从终端运行kafka consumer命令时,我没有收到任何错误:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic output_topic --from-beginning
请注意,Kafka和zookeeper都在本地运行。
我用来提交项目的命令:
bin/spark-submit --master local --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.3.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.apache.kafka:kafka-clients:1.0.0 pathtoPythonClass
任何我在哪里做的错误或任何类型的帮助的想法将高度赞赏。
暂无答案!
目前还没有任何答案,快来回答吧!