我正在尝试为我的spark流工作编写单元测试。我的spark流媒体作业使用来自 MQ
把它推到 kafka
主题。
我的方法是
向mq发送测试消息
在单独的线程中启动流作业(流作业将消息推送到Kafka主题“topic1”)
Kafka消费者继续投票的主题1。
收到消息后,停止线程并从循环中断开。
下面是我的代码和它不工作。spark流媒体工作启动良好,但一旦流媒体工作启动,我的 while
循环停止循环。我不太清楚原因,因为我是新来的 Concurrency
主题
public class StreamingJobTest {
private static KafkaConsumer<String, String> consumer;
@BeforeClass
public static void setUpClass() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9090");
properties.put("subscribe", "topic1");
properties.put("startingOffsets", "earliest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<String, String>(properties);
}
@Test
public void create_test() {
String[] arguments = new String[]{};
ConsumerRecords<String, String> records;
Thread thread = new Thread(() -> StreamingJob.main(arguments));
thread.start();
//send a message to MQ.
MqSender mqSender = new MqSender();
mqSender.mqPushMsg("TestMsg");
//keep polling the kafka topic.
while(true){
System.out.println("Polling...");
records = consumer.poll(100);
if(!records.isEmpty()){
thread.interrupt();
break;
}
assertNotNull(records);
}
}
}
为什么我的循环在流作业开始后停止工作?据我所知,流媒体将运行在单独的线程对吗?
1条答案
按热度按时间qgelzfjb1#
我自己想起来了。我需要在另一行订阅这个主题。我把它加到我的财产里了。而且groupid在kafka中是必须的,我错过了。现在对我来说很好。下面是订阅主题的代码。