kafka从windows客户端到远程linux服务器生成或使用msg失败

0g0grzrc  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(318)

我都把kafka_2.10-0.10.0.1下载到我的windows和linux机器上(我有一个集群,它有3台linux机器,192.168.80.128/129/130),所以我用windows机器作为kafka客户机,linux机器作为kafka服务器。我尝试从我的windows向远程kafka服务器生成消息,命令和响应如下:

F:\kafka_2.10-0.10.0.1\kafka_2.10-0.10.0.1\bin\windows>kafka-console-pr
oducer.bat --broker-list 192.168.80.128:9092 --topic wuchang
DADFASDF
ASDFASF
[2016-11-08 22:41:30,311] ERROR Error when sending message to topic wuchang with
 key: null, value: 8 bytes with error: (org.apache.kafka.clients.producer.intern
als.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 2 record(s) ex
pired due to timeout while requesting metadata from brokers for wuchang-0
[2016-11-08 22:41:30,313] ERROR Error when sending message to topic wuchang with
 key: null, value: 7 bytes with error: (org.apache.kafka.clients.producer.intern
als.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 2 record(s) ex
pired due to timeout while requesting metadata from brokers for wuchang-0

我非常确定我的kafka集群是正常的,因为我直接在linux服务器上成功地运行了produce和consume命令。
当然,来自远程kafka服务器的消息也失败了:

F:\kafka_2.10-0.10.0.1\kafka_2.10-0.10.0.1\bin\windows>kafka-console-co
nsumer.bat --bootstrap-server 192.168.80.128:9092 --topic wuchang --from-beginni
ng --zookeeper 192.168.80.128:2181
[2016-11-08 22:56:43,486] WARN Fetching topic metadata with correlation id 0 for
 topics [Set(wuchang)] from broker [BrokerEndPoint(1,vm02,9092)] failed (kafka.c
lient.ClientUtils$)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncP
roducer.scala:79)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(Consu
merFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

另外,我想在我的windows机器上尝试kafka java api示例,也失败了,没有任何错误消息,我的java代码是:

package com.netease.ecom.data.connect.hdfs;

import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class SimpleAvroProducer {

    public static final String USER_SCHEMA = "{"
            + "\"type\":\"record\","
            + "\"name\":\"myrecord\","
            + "\"fields\":["
            + "  { \"name\":\"str1\", \"type\":\"string\" },"
            + "  { \"name\":\"str2\", \"type\":\"string\" },"
            + "  { \"name\":\"int1\", \"type\":\"int\" }"
            + "]}";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.80.128:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(USER_SCHEMA);
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 1000; i++) {
            GenericData.Record avroRecord = new GenericData.Record(schema);
            avroRecord.put("str1", "Str 1-" + i);
            avroRecord.put("str2", "Str 2-" + i);
            avroRecord.put("int1", i);

            byte[] bytes = recordInjection.apply(avroRecord);

            ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
            producer.send(record);

            Thread.sleep(250);

        }

        producer.close();
    }
}

是的,我的代码是想发送avro数据到Kafka,而且,它失败了没有任何错误。
我的linux机器上的kafka server.properties之一是:

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题