无法在uuuuuums |之后更新元数据java应用程序无法从kafka获取元数据

wgmfuz8q  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(288)

我试着用java创建一个producer,向kafka broker上的一个主题发送一条消息。我可以通过控制台向主题发送消息,即使用kafka-console-producer.sh。但是,当我尝试对用java创建的producer执行相同的操作时,我得到一个超时异常,消息是“100000毫秒后无法获取元数据。我在这里附加kafka的producer代码和server.properties
getproducer():

private synchronized Producer<String, String> getProducer() {
    if (!producer.isPresent()) {
        Properties prodProps = new Properties();
        prodProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092,127.0.0.1:9092");
        prodProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 100000);
        prodProps.put(ProducerConfig.ACKS_CONFIG, "all");
        prodProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prodProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        setProducer(new KafkaProducer<>(prodProps));
    }
    return producer.get();
}

publishpayload():

private Future<RecordMetadata> publishPayload(DataObject dataObject) {
    String topic = //topic name;
    String key = // unique ID;
    String payload = // String payload;
    return getProducer().send(new ProducerRecord<>(topic, key, payload));
}

服务器属性


# The id of the broker. This must be set to a unique integer for each

broker.
broker.id=0

############################# Socket Server Settings

############################# 

listeners=PLAINTEXT://127.0.0.1:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092

# The port the socket server listens on

port=9092

# Hostname the broker will bind to. If not set, the server will bind to all

interfaces
host.name=127.0.0.1

# Hostname the broker will advertise to producers and consumers. If not set,

it uses the

# value for "host.name" if configured.  Otherwise, it will use the value

returned from

# java.net.InetAddress.getCanonicalHostName().

advertised.host.name=127.0.0.1

# The port to publish to ZooKeeper for clients to use. If this is not set,

# it will publish the same port that the broker binds to.

advertised.port=9092

# The number of threads handling network requests

num.network.threads=3

# The number of threads doing disk I/O

num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept

(protection against OOM)
socket.request.max.bytes=204857600

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files

log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow

greater

# parallelism for consumption, but this will also result in more files

across

# the brokers.

num.partitions=3

# The number of threads per data directory to be used for log recovery at

startup and flushing at shutdown.

# This value is recommended to be increased for installations with data dirs

located in RAID array.
num.recovery.threads.per.data.dir=1

# Zookeeper connection string (see zookeeper docs for details).

# This is a comma separated host:port pairs, each corresponding to a zk

# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".

# You can also append an optional chroot string to the urls to specify the

# root directory for all kafka znodes.

zookeeper.connect=localhost:2181

生产商属性:


# format: host1:port1,host2:port2 ...

metadata.broker.list=localhost:9092

# name of the partitioner class for partitioning events; default partition

spreads data randomly

# partitioner.class=

# specifies whether the messages are sent asynchronously (async) or

synchronously (sync)
producer.type=sync

# specify the compression codec for all data generated: none, gzip, snappy,

lz4.

# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy,

lz4, 
respectively
compression.codec=none

# message encoder

serializer.class=kafka.serializer.DefaultEncoder

请让我知道,如果我在这里错过了什么,以及如何让java生产者能够与Kafka的主题沟通。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题