如何知道从kafka队列中消耗了多少数据以及kafka队列中存在哪些数据主题?

kg7wmglp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(551)

我在“topicdemo”kafka中生成和使用数据,以实现分布式数据并并行处理这些数据。
但在实时场景中,当前需要监视队列中有多少数据(topicdemo)以及从中消耗了多少数据(topicdemo)。
有没有Kafkaapi可以提供这些细节?
这是我正在生成数据的代码

// create instance for properties to access producer configs
        Properties props = new Properties();

        // props.put("serializer.class",
        // "kafka.serializer.StringEncoder");
        props.put("bootstrap.servers", "localhost:9092");
        props.put("metadata.broker.list", "localhost:9092");

        props.put("producer.type", "async");
        props.put("batch.size", "500");
        props.put("compression.codec", "1");
        props.put("compression.topic", "topicdemo");
        // props.put("key.serializer",
        // "org.apache.kafka.common.serialization.StringSerializer");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        org.apache.kafka.clients.producer.Producer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(
                props);

            producer.send(new ProducerRecord<Integer, byte[]>("topicdemo", resume.getBytes()));
        producer.close();

这是我正在使用的数据代码

String topicName = "topicDemo";

    Properties props = new Properties();

    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

    // Kafka Consumer subscribes list of topics here.
    consumer.subscribe(Arrays.asList(topicName));

    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(5);
            for (ConsumerRecord<String, String> record : records) {
                Consumer cons = new Consumer();
                if (cons.SaveDetails(record.value()) == 1) {
                    consumer.commitSync();
                }
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        consumer.close();
    }
q5iwbnjs

q5iwbnjs1#

执行以下命令以检查为每个分区生成的消息数:
bin/kafka-run-class.sh kafka.tools.getoffsetshell—代理列表host:port --topic 主题名称--时间-1
执行以下命令以检查使用者已消耗和滞后的消息数:
bin/kafka-consumer-groups.sh--引导服务器代理1:9092--描述--组测试使用者组(对于旧使用者)
bin/kafka-consumer-groups.sh--引导服务器代理1:9092--描述--组测试使用者组--新使用者(用于新使用者)

相关问题