我有多个服务器,从中生成消息,我需要一个服务器上的代理和使用者。若生产者和消费者都在同一台服务器上运行,那个么它运行得很好,但不确定需要做什么更改来保持生产者的分离。我不想任何zookeeper和kafka服务器在生产者服务器的依赖,因为有很多,他们会增加。在设置kafkaproducer时,我尝试将引导服务器更改为代理/消费者服务器,如192.168.0.1:9092,但仍然无法生成消息。不知道我错过了什么,请帮帮我。我也跟着https://github.com/mapr-demos/kafka-sample-programs 对于代码。
尝试在同一台服务器上同时运行生产者和消费者,效果很好。
生产者.java
public class Producer {
public static void main(String[] args) throws IOException {
// set up the producer
KafkaProducer<String, String> producer;
try (InputStream props = Resources.getResource("producer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
producer = new KafkaProducer<>(properties);
}
try {
for (int i = 0; i < 1000000; i++) {
// send lots of messages
producer.send(new ProducerRecord<String, String>(
"fast-messages",
String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
// every so often send to a different topic
if (i % 1000 == 0) {
producer.send(new ProducerRecord<String, String>(
"fast-messages",
String.format("{\"type\":\"marker\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
producer.send(new ProducerRecord<String, String>(
"summary-markers",
String.format("{\"type\":\"other\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
producer.flush();
System.out.println("Sent msg number " + i);
}
}
} catch (Throwable throwable) {
System.out.printf("%s", throwable.getStackTrace());
} finally {
producer.close();
}
}
prodcuer.道具
bootstrap.servers=192.168.0.1:9092
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true
消费者.java
public class Consumer {
public static void main(String[] args) throws IOException {
// set up house-keeping
ObjectMapper mapper = new ObjectMapper();
Histogram stats = new Histogram(1, 10000000, 2);
Histogram global = new Histogram(1, 10000000, 2);
// and the consumer
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
if (properties.getProperty("group.id") == null) {
properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
}
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList("fast-messages", "summary-markers"));
int timeouts = 0;
//noinspection InfiniteLoopStatement
while (true) {
// read records with a short timeout. If we time out, we don't really care.
ConsumerRecords<String, String> records = consumer.poll(200);
if (records.count() == 0) {
timeouts++;
} else {
System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts);
timeouts = 0;
}
for (ConsumerRecord<String, String> record : records) {
switch (record.topic()) {
case "fast-messages":
// the send time is encoded inside the message
JsonNode msg = mapper.readTree(record.value());
switch (msg.get("type").asText()) {
case "test":
long latency = (long) ((System.nanoTime() * 1e-9 - msg.get("t").asDouble()) * 1000);
stats.recordValue(latency);
global.recordValue(latency);
break;
case "marker":
// whenever we get a marker message, we should dump out the stats
// note that the number of fast messages won't necessarily be quite constant
System.out.printf("%d messages received in period, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n",
stats.getTotalCount(),
stats.getValueAtPercentile(0), stats.getValueAtPercentile(100),
stats.getMean(), stats.getValueAtPercentile(99));
System.out.printf("%d messages received overall, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n",
global.getTotalCount(),
global.getValueAtPercentile(0), global.getValueAtPercentile(100),
global.getMean(), global.getValueAtPercentile(99));
stats.reset();
break;
default:
throw new IllegalArgumentException("Illegal message type: " + msg.get("type"));
}
break;
case "summary-markers":
break;
default:
throw new IllegalStateException("Shouldn't be possible to get message on topic " + record.topic());
}
}
}
}
}
消费者.道具
bootstrap.servers=192.168.0.1:9092
group.id=test
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# fast session timeout makes it more fun to play with failover
session.timeout.ms=10000
# These buffer sizes seem to be needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way. No idea why this happens.
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152
1条答案
按热度按时间x8diyxa71#
当生产者不在代理机器上时,到底会发生什么?您是否看到任何日志或错误消息?您没有描述您的设置,但是ip是192.168.0.1(代理机器)并且端口9092是否对外开放(检查iptables)?
另一件事:如果生产者和消费者不在同一台机器上,上面的代码将不会给您有意义的结果。你用
System.nanoTime()
测量延迟。但根据官方文件:此方法只能用于测量经过的时间,与系统或挂钟时间的任何其他概念无关。返回的值表示自某个固定但任意的起始时间(可能在将来,因此值可能为负)起的纳秒数。在java虚拟机的示例中,该方法的所有调用都使用相同的来源;其他虚拟机示例可能使用不同的源。