向主题发送消息时出错

ryhaxcpt  于 2021-06-08  发布在  Kafka
关注(0)|答案(13)|浏览(436)

在Kafka中生成消息时,我收到以下错误:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nil_PF1_P1
hi
hello

[2016-07-19 17:06:34,542] ERROR Error when sending message to topic nil_PF1_P1 with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
[2016-07-19 17:07:34,544] ERROR Error when sending message to topic nil_PF1_P1 with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic nil_PF1_P1
Topic:nil_PF1_P1    PartitionCount:1    ReplicationFactor:1 Configs:
Topic: nil_PF1_P1   Partition: 0    Leader: 2   Replicas: 2 Isr: 2

有什么想法吗??

xtfmy6hx

xtfmy6hx1#

有这个问题:使用HortonWorksHDP2.5。kerberisation已启用
通过提供正确的安全协议和端口进行修复。命令示例:

./kafka-console-producer.sh --broker-list sand01.intranet:6667, san02.intranet:6667, san03.intranet:6667--topic test--security-protocol PLAINTEXTSASL

./kafka-console-consumer.sh --zookeeper sand01:2181 --topic test--from-beginning --security-protocol PLAINTEXTSASL
ckocjqey

ckocjqey2#

对于apache kafka v2.11-1.1.0
启动zookeeper服务器:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka服务器:

$ bin/kafka-server-start.sh config/server.properties

创建一个主题名“我的主题”:

$ bin/kafka-topics.sh --create --topic my_topic --zookeeper localhost:2181 --replication-factor 1 --partitions 1

启动生产者:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic

启动使用者:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning
ca1c2owp

ca1c2owp3#

这可能是因为Kafka理论中的一些参数 server.properties 文件。你可以在这里找到更多信息
停止kafka服务器

cd $KAFKA_HOME/bin  
./kafka-server-stop.sh

改变

listeners=PLAINTEXT://hostname:9092


听众=plaintext://0.0.0.0:9092
$KAFKA_HOME/config/server.properties 重新启动kafka服务器

$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
xxls0lw8

xxls0lw84#

我也面临着类似的问题,在那里我可以生产和消费 localhost 但不是来自网络上的不同机器。根据一些答案,我得到了基本上我们需要揭露的线索 advertised.listener 然而,对于生产者和消费者来说,给予0.0.0.0也不起作用。所以给了确切的ip地址
advertised.listeners advertised.listeners=PLAINTEXT://HOST.IP:9092 我就走了 listener=PLAINTEXT://:9092 就这样。
因此,spark将广告中的ip和端口暴露给生产者和消费者

d6kp6zgx

d6kp6zgx5#

而不是改变 server.properties 包括地址 0.0.0.0 在代码本身。而不是

/usr/bin/kafka-console-producer --broker-list Hostname:9092 --topic MyFirstTopic1

使用

/usr/bin/kafka-console-producer --broker-list 0.0.0.0:9092 --topic MyFirstTopic1
tct7dpnv

tct7dpnv6#

我知道这很老了,但这可能适用于正在处理它的其他人:我改变了两件事:
1将“bootstrap.servers”属性或--broker list选项更改为0.0.0.0:9092
2更改(在本例中取消注解并编辑)2个属性中的server.properties
侦听器=plaintext://your.host.name:9092 to listeners=纯文本://:9092
播发的.listeners=plaintext://your.host.name:9092到adverted.listeners=plaintext://localhost:9092

2lpgd968

2lpgd9687#

我今天也犯了同样的错误 confluent_kafka 0.9.2 (0x90200) 以及 librdkafka 0.9.2 (0x90401) . 在我的例子中,我在tutorialpoints示例中指定了错误的代理端口:

$ kafka-console-producer.sh --broker-list localhost:9092 --topic tutorialpoint-basic-ops-01

虽然我的代理是在端口9094上启动的:

$ cat server-02.properties 
broker.id=2
port=9094
log.dirs=/tmp/kafka-example-logs-02
zookeeper.connect=localhost:2181

虽然9092端口没有打开( netstat -tunap ),花了60秒 kafka-console-producer.sh 提出错误。看起来此工具需要修复:
更快地失败
带有更明确的错误消息。

8e2ybdfx

8e2ybdfx8#

我在hortonworks(hdp2.x版本)安装中使用apachekafka。遇到的错误消息意味着kafka生产者无法将数据推送到段日志文件。在命令行控制台中,这意味着两件事:
您使用的代理端口不正确
server.properties中的侦听器配置不工作
如果在通过scalaapi编写时遇到错误消息,请另外使用检查到kafka集群的连接 telnet <cluster-host> <broker-port> 注意:如果您使用scalaapi来创建主题,那么代理需要一些时间来了解新创建的主题。因此,在主题创建之后,生产者可能会立即失败并出现错误 Failed to update metadata after 60000 ms. 为了解决此问题,我进行了以下检查:
第一个不同的是,我通过Ambari检查Kafka经纪人监听端口 6667 在hdp2.x上(apachekafka使用9092)。

listeners=PLAINTEXT://localhost:6667

接下来,使用ip而不是localhost。我处决了 netstat -na | grep 6667 ```
tcp 0 0 192.30.1.5:6667 0.0.0.0:* LISTEN
tcp 1 0 192.30.1.5:52242 192.30.1.5:6667 CLOSE_WAIT
tcp 0 0 192.30.1.5:54454 192.30.1.5:6667 TIME_WAIT

因此,我将producer调用修改为user-the-ip,而不是localhost:

./kafka-console-producer.sh --broker-list 192.30.1.5:6667 --topic rdl_test_2

要监视是否正在写入新记录,请监视 `/kafka-logs` 文件夹。

cd /kafka-logs//
ls -lart
-rw-r--r--. 1 kafka hadoop 0 Feb 10 07:24 00000000000000000000.log
-rw-r--r--. 1 kafka hadoop 10485756 Feb 10 07:24 00000000000000000000.timeindex
-rw-r--r--. 1 kafka hadoop 10485760 Feb 10 07:24 00000000000000000000.index

一旦生产者成功写入段日志文件 `00000000000000000000.log` 会变大。
尺寸如下:

-rw-r--r--. 1 kafka hadoop 10485760 Feb 10 07:24 00000000000000000000.index
-rw-r--r--. 1 kafka hadoop 45Feb 10 09:16 00000000000000000000.log
-rw-r--r--. 1 kafka hadoop 10485756 Feb 10 07:24 00000000000000000000.timeindex

此时,可以运行consumer-console.sh:

./kafka-console-consumer.sh --bootstrap-server 192.30.1.5:6667 --topic rdl_test_2 --from-beginning
response is hello world

在这个步骤之后,如果您想通过scalaapi生成消息,那么更改 `listeners` 值(从localhost到公共ip)并通过ambari重新启动kafka代理:

listeners=PLAINTEXT://192.30.1.5:6667

样品生产商如下:

package com.scalakafka.sample
import java.util.Properties
import java.util.concurrent.TimeUnit

import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
import org.apache.kafka.common.serialization.{StringSerializer, StringDeserializer}

class SampleKafkaProducer {
case class KafkaProducerConfigs(brokerList: String = "192.30.1.5:6667") {
val properties = new Properties()
val batchsize :java.lang.Integer = 1

properties.put("bootstrap.servers", brokerList)
properties.put("key.serializer", classOf[StringSerializer])
properties.put("value.serializer", classOf[StringSerializer])
//    properties.put("serializer.class", classOf[StringDeserializer])
    properties.put("batch.size", batchsize)
//    properties.put("linger.ms", 1)
//    properties.put("buffer.memory", 33554432)

}

val producer = new KafkaProducerString, String

def produce(topic: String, messages: Iterable[String]): Unit = {
messages.foreach { m =>
println(s"Sending $topic and message is $m")
val result = producer.send(new ProducerRecord(topic, m)).get()
println(s"the write status is ${result}")
}
producer.flush()
producer.close(10L, TimeUnit.MILLISECONDS)
}
}

希望这对别人有帮助。
7uzetpgm

7uzetpgm9#

在主题后添加这样一行有助于解决同一问题:--主题--属性“parse.key=true”--属性“key.separator=:”
希望这对别人有帮助。

zvokhttg

zvokhttg10#

如果您正在运行hortonworks集群,请检查ambari中的侦听端口。
在我的情况下,9092不是我的港口。我去了ambari,发现监听端口设置为6667,这对我很有用。:)

j7dteeu8

j7dteeu811#

在我的例子中,我使用kafka docker和openshift。我也遇到了同样的问题。当我传递环境变量时它被修复了 KAFKA_LISTENERS 值为 PLAINTEXT://:9092 . 这将最终添加并创建一个条目 listeners=PLAINTEXT://:9092 在server.properties下。
侦听器不必有主机名。

cu6pst1q

cu6pst1q12#

我面临上述例外。我调查并找到了根本原因。当我建立带有两个节点的kafka群集时,我遇到了这个问题。在server.properties中使用以下设置。在这里,我将kafka节点1和2的server.properties表示为broker1.properties和broker2.properties
broker1.properties设置

listeners=PLAINTEXT://A.B.C.D:9092
    zookeeper.connect=A.B.C.D:2181,E.F.G.H:2181

broker2.properties设置

listeners=PLAINTEXT://E.F.G.H:9092
    zookeeper.connect=A.B.C.D:2181,E.F.G.H:2181

我试图使用以下命令从node1或node2启动producer:./bin/kafka-console-producer.sh——brokerlistlocalhost:9092 --topic 我们的主题和我得到了上面的超时异常stacktrace,尽管kafka在这两台机器上都运行。
虽然制作者要么从领导者节点开始,要么从追随者开始,但我总是得到相同的结果。
当从任何代理使用下面的命令时,我都能得到消息。

./bin/kafka-console-producer.sh --broker-list A.B.C.D:9092 --topic OUR_TOPIC
    or
   ./bin/kafka-console-producer.sh --broker-list E.F.G.H:9092 --topic OUR_TOPIC
   or
   ./bin/kafka-console-producer.sh --broker-list A.B.C.D:9092,E.F.G.H:9092 --topic OUR_TOPIC

所以根本原因是Kafka在内部使用监听器=plaintext://e.f.g.h:9092启动生产者时的属性。此属性必须匹配才能在启动生产者时从任何节点启动kafka代理。正在将此属性转换为侦听器=plaintext://localhost:9092将为我们的第一指挥部工作。

flvtvl50

flvtvl5013#

另一种情况。直到我找到Kafka的日志,上面有以下信息,我才知道发生了什么:

Caused by: java.lang.IllegalArgumentException: Invalid version for API key 3: 2

显然,制作者使用的是比kafka服务器更新的kafka客户端(java),使用的api无效(客户端使用1.1,服务器使用10.0)。关于客户/制作人,我得到:

Error producing to topic Failed to update metadata after 60000 ms.

相关问题