我在用Kafka 1.0.1-kafka-3.1.0-SNAPSHOT
来自cdh(hadoop的cloudera发行版)
在我的batch-1边缘服务器上,我可以生成具有以下内容的消息:
kafka-console-producer --broker-list batch-1:9092 --topic MyTopic
感谢zookeeper在我的第一个节点上提供了以下功能,我可以使用这些信息:
kafka-console-consumer --zookeeper data1:2181 --topic MyTopic --from-beginning
但我没有得到任何引导服务器选项:
kafka-console-consumer --bootstrap-server batch-1:9092 --topic MyTopic --from-beginning
问题是我把Kafka用在了spark上: libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0"
```
val df = spark.readStream
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("kafka.bootstrap.servers", "batch-1:9092")
.option("subscribe", "MyTopic")
.load()
println("Select :")
val df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)")
.as[(String, String, String)]
println("Show :")
val query = df2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
我做了一个 `export SPARK_KAFKA_VERSION=0.10` 在我的边缘。然后
spark2-submit --driver-memory 2G --jars spark-sql-kafka-0-10_2.11-2.3.0.cloudera4.jar --class "spark.streaming.Poc" poc_spark_kafka_2.11-0.0.1.jar
这迫使我使用 `kafka.bootstrap.servers` ,似乎有联系,但我没有收到任何信息。
输出与 `kafka-console-consumer` 与 `--bootstrap-server` 选项:
18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka commitId : unknown
18/10/30 16:11:48 INFO streaming.MicroBatchExecution: Starting new streaming query.
然后,什么也没有。我要接Zookeeper吗?怎样?
他们在这里说的“结构化流媒体+Kafka集成指南(Kafka代理版本0.10.0或更高版本)”是否存在版本冲突:https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html ?
我错过了什么?
1条答案
按热度按时间aurhwmvo1#
解决方案
这个
/var/log/kafka/kafka-broker-batch-1.log
说:2018-10-31 13:40:08,284 ERROR kafka.server.KafkaApis: [KafkaApi-51] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.
因此,我在集群节点上部署了3个代理,边缘上有一个网关,现在可以使用:kafka-console-producer --broker-list data1:9092,data2:9092,data3:9092 --topic Test
kafka-console-consumer --bootstrap-server data1:9092 --topic Test --from-beginning
spark也很好用。