我有一个springboot应用程序,它使用kafka和ehcache在不同的微服务和示例之间执行缓存同步。我使用的是springboot2.2.4和kafka客户机的匹配版本。如何测试我的Kafka客户机是否与嵌入式Kafka正常工作。
我试过:
测试等级
@RunWith(SpringRunner.class)
@SpringBootTest()
@ActiveProfiles({"inmemory", "test", "kafka-test"})
@WebAppConfiguration
@DirtiesContext
public class CachePropagatorTest
{
private static final String topic = "com.allstate.d3.sh.test.cache";
//private static final String topic2 = "com.allstate.sh.test.alloc";
//@Rule
@ClassRule
public static final EmbeddedKafkaRule embeddedKafkaRule;
static
{
embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic);
embeddedKafkaRule
.getEmbeddedKafka().brokerListProperty("spring.kafka.bootstrap-servers");
}
//@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
KafkaTemplate<String, CacheMessage> kafkaTemplate;
@Autowired
KafkaSHProperties properties;
//@Autowired
@SpyBean
CachePropagator propagator;
//CachePropagationHelper propagator;
BlockingQueue<CacheMessage> records = new LinkedBlockingQueue<>();
/* read sent messages */
Consumer<Integer, CacheMessage> consumer;
private String topic1;
@Before
public void setUp() throws Exception
{
embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
topic1 = properties.getCacheTopic();
assertThat(topic1, is(topic));
//embeddedKafka.getEmbeddedKafka().addTopics(topic1);
try { embeddedKafka.addTopics(topic1); }
catch (KafkaException Ignored) { }
Mockito.doAnswer(new Answer<Void>()
{
@Override
public Void answer(InvocationOnMock invocation) throws Throwable
{
System.out.println("Cache Message Receive");
records.add((CacheMessage) invocation.getArgument(0));
return (Void)invocation.callRealMethod();
}
}).when(propagator).receive(ArgumentMatchers.any(),
ArgumentMatchers.anyString());
//prove raw template usage
CacheMessage cm = new CacheMessage("Test","Test","put",
true,"");
kafkaTemplate.send(topic1, cm);
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(properties.getCacheConsumptionGroup(),
"false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, CacheMessage> cf =
new DefaultKafkaConsumerFactory<Integer, CacheMessage>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
Set<String> topics = embeddedKafka.getTopics();
assertThat(topics.size(),is(1) );
assertThat(topics,hasItem(topic1) );
//prove sent message received
ConsumerRecord<Integer, CacheMessage> received =
KafkaTestUtils.getSingleRecord(consumer, topic1, 30000);
assertThat(received.value(), is("Test"));
}
@After
public void tearDown() throws Exception { }
@Test
public void putExperiment() throws Exception
{
Date now = new Date();
JsonNode emptyNode = new ObjectMapper().readTree("");
List<BucketDetail> buckets = new ArrayList<>();
buckets.add(new BucketDetail("99-1", "Kafka Bucket 1",
0.5, emptyNode));
buckets.add(new BucketDetail("99-2", "Kafka Bucket 2",
0.5, emptyNode));
buckets.add(new BucketDetail());
ExperimentDetail exp = new ExperimentDetail("99", 1,
"KafkaTest",
"SH_TEST_PROFILE_9",
buckets, LifecycleStage.CONFIGURED,
now, null, "Mete Test Notes");
propagator.putExperiment(exp);
//TODO: test the allocation was correct
ConsumerRecord<Integer, CacheMessage> received =
KafkaTestUtils.getSingleRecord(consumer, topic1, 10000);
//TODO: how much should this verify in the message
assertThat(received.value().getAction(), is("put"));
assertThat(received.value().getItem().toString(),
containsString(exp.getExperimentID()));
}
}
应用测试中的Kafka.yml
spring:
kafka:
bootstrap-servers: localhost:2181
listener:
#add topics after start
missing-topics-fatal: false
properties:
sasl:
kerberos:
service:
name: kafka
security:
protocol: SASL_PLAINTEXT
consumer:
properties:
spring:
json:
trusted:
packages: com.allstate.d3.sh.commons.messaging
bootstrap-servers: localhost:2181
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
bootstrap-servers: localhost:2181
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
profiles:
active: inmemory,kafka-test
运行测试时,它在单元中失败 setup()
方法
java.lang.IllegalStateException: No records found for topic
at org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord(KafkaTestUtils.java:187)
at com.allstate.d3.sh.execution.event.CachePropagatorTest.setUp(CachePropagatorTest.java:169)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.lang.Thread.run(Thread.java:748)
那我为什么找不到这个主题的发送记录呢?
更新
下面@quicksilver的回答指出了并行运行。我可以吗 @SpyBean CachePropagator propagator;
干扰我的考试。
cache Propertator的侦听器方法定义如下:
@KafkaListener(topics = "#{kafkaSHProperties.cacheTopic}",
groupId = "#{kafkaSHProperties.cacheConsumptionGroup}")
public void receive(@Payload CacheMessage message,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key)
{
if (!envName.equals(message.getEnv())) { return; }
log.info("07e9d084-1b8c-4c4c-b9be-9e7bb2716c3c -- Cache sync message: {}, {}, {}",
key, message.getEnv(), message.getCacheName());
processMessage(message);
}
那会是我的留言吗?如果是这样的话,他们不应该仍然可以在经纪人?如果不是的话,我可以改变这个设置吗?
1条答案
按热度按时间ni65a41a1#
你能告诉我你的密码里有哪些项目吗,
Kafka消费是在Kafka制作人之前启动的,即使在Kafka制作人出版唱片之后,它也会继续运行
如果Kafka消费者在Kafka制作者之后开始,那么它应该从开始偏移量开始轮询。
在我的机器上低于测试