不同服务器上的apache kafka生产者和消费者

6mw9ycah  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(529)

我有多个服务器,从中生成消息,我需要一个服务器上的代理和使用者。若生产者和消费者都在同一台服务器上运行,那个么它运行得很好,但不确定需要做什么更改来保持生产者的分离。我不想任何zookeeper和kafka服务器在生产者服务器的依赖,因为有很多,他们会增加。在设置kafkaproducer时,我尝试将引导服务器更改为代理/消费者服务器,如192.168.0.1:9092,但仍然无法生成消息。不知道我错过了什么,请帮帮我。我也跟着https://github.com/mapr-demos/kafka-sample-programs 对于代码。
尝试在同一台服务器上同时运行生产者和消费者,效果很好。
生产者.java

public class Producer {
    public static void main(String[] args) throws IOException {
        // set up the producer
        KafkaProducer<String, String> producer;
        try (InputStream props = Resources.getResource("producer.props").openStream()) {
            Properties properties = new Properties();
            properties.load(props);
            producer = new KafkaProducer<>(properties);
        }

        try {
            for (int i = 0; i < 1000000; i++) {
                // send lots of messages
                producer.send(new ProducerRecord<String, String>(
                        "fast-messages",
                        String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));

                // every so often send to a different topic
                if (i % 1000 == 0) {
                    producer.send(new ProducerRecord<String, String>(
                            "fast-messages",
                            String.format("{\"type\":\"marker\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
                    producer.send(new ProducerRecord<String, String>(
                            "summary-markers",
                            String.format("{\"type\":\"other\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
                    producer.flush();
                    System.out.println("Sent msg number " + i);
                }
            }
        } catch (Throwable throwable) {
            System.out.printf("%s", throwable.getStackTrace());
        } finally {
            producer.close();
        }

    }

prodcuer.道具

bootstrap.servers=192.168.0.1:9092
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true

消费者.java

public class Consumer {
    public static void main(String[] args) throws IOException {
        // set up house-keeping
        ObjectMapper mapper = new ObjectMapper();
        Histogram stats = new Histogram(1, 10000000, 2);
        Histogram global = new Histogram(1, 10000000, 2);

        // and the consumer
        KafkaConsumer<String, String> consumer;
        try (InputStream props = Resources.getResource("consumer.props").openStream()) {
            Properties properties = new Properties();
            properties.load(props);
            if (properties.getProperty("group.id") == null) {
                properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
            }
            consumer = new KafkaConsumer<>(properties);
        }
        consumer.subscribe(Arrays.asList("fast-messages", "summary-markers"));
        int timeouts = 0;
        //noinspection InfiniteLoopStatement
        while (true) {
            // read records with a short timeout. If we time out, we don't really care.
            ConsumerRecords<String, String> records = consumer.poll(200);
            if (records.count() == 0) {
                timeouts++;
            } else {
                System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts);
                timeouts = 0;
            }
            for (ConsumerRecord<String, String> record : records) {
                switch (record.topic()) {
                    case "fast-messages":
                        // the send time is encoded inside the message
                        JsonNode msg = mapper.readTree(record.value());
                        switch (msg.get("type").asText()) {
                            case "test":
                                long latency = (long) ((System.nanoTime() * 1e-9 - msg.get("t").asDouble()) * 1000);
                                stats.recordValue(latency);
                                global.recordValue(latency);
                                break;
                            case "marker":
                                // whenever we get a marker message, we should dump out the stats
                                // note that the number of fast messages won't necessarily be quite constant
                                System.out.printf("%d messages received in period, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n",
                                        stats.getTotalCount(),
                                        stats.getValueAtPercentile(0), stats.getValueAtPercentile(100),
                                        stats.getMean(), stats.getValueAtPercentile(99));
                                System.out.printf("%d messages received overall, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n",
                                        global.getTotalCount(),
                                        global.getValueAtPercentile(0), global.getValueAtPercentile(100),
                                        global.getMean(), global.getValueAtPercentile(99));

                                stats.reset();
                                break;
                            default:
                                throw new IllegalArgumentException("Illegal message type: " + msg.get("type"));
                        }
                        break;
                    case "summary-markers":
                        break;
                    default:
                        throw new IllegalStateException("Shouldn't be possible to get message on topic " + record.topic());
                }
            }
        }
    }
}

消费者.道具

bootstrap.servers=192.168.0.1:9092
group.id=test
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# fast session timeout makes it more fun to play with failover

session.timeout.ms=10000

# These buffer sizes seem to be needed to avoid consumer switching to

# a mode where it processes one bufferful every 5 seconds with multiple

# timeouts along the way.  No idea why this happens.

fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152
x8diyxa7

x8diyxa71#

当生产者不在代理机器上时,到底会发生什么?您是否看到任何日志或错误消息?您没有描述您的设置,但是ip是192.168.0.1(代理机器)并且端口9092是否对外开放(检查iptables)?
另一件事:如果生产者和消费者不在同一台机器上,上面的代码将不会给您有意义的结果。你用 System.nanoTime() 测量延迟。但根据官方文件:
此方法只能用于测量经过的时间,与系统或挂钟时间的任何其他概念无关。返回的值表示自某个固定但任意的起始时间(可能在将来,因此值可能为负)起的纳秒数。在java虚拟机的示例中,该方法的所有调用都使用相同的来源;其他虚拟机示例可能使用不同的源。

相关问题