junit Kafka --对一个主题产生,然后从那个主题中消费?

tct7dpnv  于 2022-11-11  发布在  Kafka
关注(0)|答案(1)|浏览(165)

我以前是一个学习Kafka的传统ActiveMQ用户。我有一个问题。
使用Active MQ,您可以执行以下操作:

  • 将100封邮件提交到队列中
  • 你想等多久都行
  • 使用该队列中的这100条消息。保证消息的单个使用者。

我在Kafka的作品中也试图做同样的事情

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

public class KafkaTest {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaTest.class);
    public static final String MY_GROUP_ID = "my-group-id";
    public static final String TOPIC = "topic";

    KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

    @Before
    public void before() {
        kafka.start();
    }

    @After
    public void after() {
        kafka.close();
    }

    @Test
    public void testPipes() throws ExecutionException, InterruptedException {

        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        consumerProps.put("group.id", MY_GROUP_ID);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        ExecutorService es = Executors.newCachedThreadPool();
        Future consumerFuture = es.submit(() -> {
            try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
                consumer.subscribe(Collections.singletonList(TOPIC));
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                    for (ConsumerRecord<String, String> record : records) {
                        LOG.info("Thread: {}, Topic: {}, Partition: {}, Offset: {}, key: {}, value: {}", Thread.currentThread().getName(), record.topic(), record.partition(), record.offset(), record.key(), record.value().toUpperCase());
                    }
                }
            } catch (Exception e) {
                LOG.error("Consumer error", e);
            }
        });

        Thread.sleep(10000); // NOTICE! if you remove this, the consumer will not receive the messages. because the consumer won't be registered yet before the messages come rolling on in.

        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        Future producerFuture = es.submit(() -> {
            try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps)) {
                int counter = 0;

                while (counter <= 100) {
                    System.out.println("Sent " + counter);
                    String msg = "Message " + counter;
                    producer.send(new ProducerRecord<>(TOPIC, msg));
                    counter++;
                }

            } catch (Exception e) {
                LOG.error("Failed to send message by the producer", e);
            }
        });

        producerFuture.get();
        consumerFuture.get();
    }
}

如果不启动Consumer,等待它启动,然后运行Producer,则此示例不起作用。
有谁能告诉我如何修改我的示例程序,以便在消息等待使用的情况下执行操作?

fquxozlt

fquxozlt1#

在您的消费者配置中,您需要添加auto.offset.reset=earliest或在订阅后调用seekToBeginning
换句话说,如果你在生成器之后启动消费者,它将在所有现有数据之后开始读取。

相关问题