我都把kafka_2.10-0.10.0.1下载到我的windows和linux机器上(我有一个集群,它有3台linux机器,192.168.80.128/129/130),所以我用windows机器作为kafka客户机,linux机器作为kafka服务器。我尝试从我的windows向远程kafka服务器生成消息,命令和响应如下:
F:\kafka_2.10-0.10.0.1\kafka_2.10-0.10.0.1\bin\windows>kafka-console-pr
oducer.bat --broker-list 192.168.80.128:9092 --topic wuchang
DADFASDF
ASDFASF
[2016-11-08 22:41:30,311] ERROR Error when sending message to topic wuchang with
key: null, value: 8 bytes with error: (org.apache.kafka.clients.producer.intern
als.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 2 record(s) ex
pired due to timeout while requesting metadata from brokers for wuchang-0
[2016-11-08 22:41:30,313] ERROR Error when sending message to topic wuchang with
key: null, value: 7 bytes with error: (org.apache.kafka.clients.producer.intern
als.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 2 record(s) ex
pired due to timeout while requesting metadata from brokers for wuchang-0
我非常确定我的kafka集群是正常的,因为我直接在linux服务器上成功地运行了produce和consume命令。
当然,来自远程kafka服务器的消息也失败了:
F:\kafka_2.10-0.10.0.1\kafka_2.10-0.10.0.1\bin\windows>kafka-console-co
nsumer.bat --bootstrap-server 192.168.80.128:9092 --topic wuchang --from-beginni
ng --zookeeper 192.168.80.128:2181
[2016-11-08 22:56:43,486] WARN Fetching topic metadata with correlation id 0 for
topics [Set(wuchang)] from broker [BrokerEndPoint(1,vm02,9092)] failed (kafka.c
lient.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncP
roducer.scala:79)
at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(Consu
merFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
另外,我想在我的windows机器上尝试kafka java api示例,也失败了,没有任何错误消息,我的java代码是:
package com.netease.ecom.data.connect.hdfs;
import com.twitter.bijection.Injection;
import com.twitter.bijection.avro.GenericAvroCodecs;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class SimpleAvroProducer {
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.80.128:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
}
是的,我的代码是想发送avro数据到Kafka,而且,它失败了没有任何错误。
我的linux机器上的kafka server.properties之一是:
暂无答案!
目前还没有任何答案,快来回答吧!