我用的是Kafka2.11-0.10.2.1,zookeeper是Kafka软件包中的那个。配置是一个代理,一个分区,没有复制。首先,我在自己的机器上测试我的代码,测试是可以的,生产者和消费者都可以很好地工作,但当我在服务器(27.57.100.)上部署kafka,在机器(27.57.101.)上部署我正在运行的代码时,局域网中的所有机器都可以互相访问。我的代码与Kafka网站上的示例相同:
public class MyProducer {
private static Properties props = new Properties();
private static String topic = "test11";
static {
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
}
public void produceMessage() throws InterruptedException {
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i=0; i<1000; i++){
producer.send(new ProducerRecord<String, String>(
topic, Integer.toString(i), Integer.toString(i)));
System.out.println(i);
Thread.sleep(100);
}
producer.close();
}
public static void main(String[] args) throws InterruptedException {
MyProducer myProducer = new MyProducer();
myProducer.produceMessage();
}
}
问题很奇怪,没有错误消息,当我第一次启动producer程序时,代理可以创建主题目录,但是.log文件的大小总是0,输出需要30-40秒来打印每个i的值;当我第二次启动producer程序时,输出会正常打印i的值,但不会存储该值。我可以在shell中创建主题、发送消息和使用消息。防火墙已关闭。我需要你的帮助。非常感谢你!
暂无答案!
目前还没有任何答案,快来回答吧!