java中的多线程spark流单元测试

s2j5cfk0  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(379)

我正在尝试为我的spark流工作编写单元测试。我的spark流媒体作业使用来自 MQ 把它推到 kafka 主题。
我的方法是
向mq发送测试消息
在单独的线程中启动流作业(流作业将消息推送到Kafka主题“topic1”)
Kafka消费者继续投票的主题1。
收到消息后,停止线程并从循环中断开。
下面是我的代码和它不工作。spark流媒体工作启动良好,但一旦流媒体工作启动,我的 while 循环停止循环。我不太清楚原因,因为我是新来的 Concurrency 主题

public class StreamingJobTest {

private static KafkaConsumer<String, String> consumer;

@BeforeClass
public static void setUpClass()  {

    Properties properties = new Properties();

    properties.put("bootstrap.servers", "localhost:9090");
    properties.put("subscribe", "topic1");
    properties.put("startingOffsets", "earliest");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    consumer = new KafkaConsumer<String, String>(properties);

}

@Test
public void create_test() {
    String[] arguments = new String[]{};
    ConsumerRecords<String, String> records;

    Thread thread = new Thread(() -> StreamingJob.main(arguments));
    thread.start();

     //send a message to MQ.

    MqSender mqSender = new MqSender();
    mqSender.mqPushMsg("TestMsg");

    //keep polling the kafka topic.

    while(true){
        System.out.println("Polling...");
        records = consumer.poll(100);

        if(!records.isEmpty()){

            thread.interrupt();
            break;
        }

    assertNotNull(records);

    }

}

}
为什么我的循环在流作业开始后停止工作?据我所知,流媒体将运行在单独的线程对吗?

qgelzfjb

qgelzfjb1#

我自己想起来了。我需要在另一行订阅这个主题。我把它加到我的财产里了。而且groupid在kafka中是必须的,我错过了。现在对我来说很好。下面是订阅主题的代码。

consumer.subscribe(Arrays.asList("topic1"));

相关问题