在尝试对SpringKafka使用者进行单元测试时,如何解决配置中的错误?

icomxhvb  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(359)

代码位置
我认为可能有很多模块可以让这个问题看起来很清楚,所以这里是回购协议。我希望包括所有必要的组成部分。https://github.com/ewingian/restcalculator
问题
我正在学习编写Kafka服务,这个过程包括为生产者和消费者学习单元测试。遵循了一个关于与使用者一起设置单元测试的教程。当我运行测试时,我收到一个类配置错误。
错误

9:34:59.547 [main] WARN kafka.server.BrokerMetadataCheckpoint - No meta.properties file under dir /tmp/kafka-8346130278143417083/meta.properties
09:34:59.567 [main] ERROR kafka.server.KafkaServer - [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown
java.lang.NoClassDefFoundError: org/apache/kafka/common/network/LoginType
    at kafka.network.Processor.<init>(SocketServer.scala:406)
    at kafka.network.SocketServer.newProcessor(SocketServer.scala:141)
    at kafka.network.SocketServer$$anonfun$startup$1$$anonfun$apply$1.apply$mcVI$sp(SocketServer.scala:94)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:93)
    at kafka.network.SocketServer$$anonfun$startup$1.apply(SocketServer.scala:89)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
    at kafka.network.SocketServer.startup(SocketServer.scala:89)
    at kafka.server.KafkaServer.startup(KafkaServer.scala:219)
    at kafka.utils.TestUtils$.createServer(TestUtils.scala:120)
    at kafka.utils.TestUtils.createServer(TestUtils.scala)
    at org.springframework.kafka.test.rule.KafkaEmbedded.before(KafkaEmbedded.java:154)
    at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.network.LoginType
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 23 common frames omitted

图为我的目录结构

当我在idea的项目结构设置中查看库列表时,我看到 org.apache.kafka:kafka-clients:0.11.0.0 ; 但是,我无法导入丢失的模块,我知道它是kafka客户机的一部分( org/apache/kafka/common/network/LoginType )
问题
以前有人遇到过这个错误吗?我的gradle文件配置错误了吗?我的项目目录设置是否正确,以有效地解决我可能缺少的问题?执行kafka单元测试?我还没有在logintype上找到太多信息,但会继续搜索。
这是gradle构建文件的副本:

buildscript {
    repositories {
        mavenCentral()
    }misconfigured
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:1.5.10.RELEASE")
    }
}

apply plugin: 'java'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'org.springframework.boot'

jar {
    baseName = 'calculator'
    version =  '0.1.0'
}

repositories {
    mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
    compile("org.springframework.boot:spring-boot-starter-web")
    testCompile("org.springframework.boot:spring-boot-starter-test")
    compile("org.springframework.kafka:spring-kafka:1.3.2.RELEASE")
    testCompile("org.springframework.kafka:spring-kafka-test")
}

task wrapper(type: Wrapper) {
    gradleVersion = '2.3'
}

如果还有什么我可能需要包括在这个问题,请让我知道。谢谢
单元测试代码

package com.calculator;

/**
 * Created by ian on 2/9/18.
 */
import com.calculator.kafka.services.*;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertThat;
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
import static org.springframework.kafka.test.hamcrest.KafkaMatchers.hasValue;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;

import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaTest {

//    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTest.class);
    private static final String TEMPLATE_TOPIC = "input";
    private static String SENDER_TOPIC = "input";
    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TEMPLATE_TOPIC);

    private KafkaMessageListenerContainer<String, Integer> container;

    private BlockingQueue<ConsumerRecord<String, Integer>> records;

    @Autowired
    private KafkaConsumer consumer;

    @Before
    public void setUp() {
        // Set up the consumer properties
        Map<String, Object> integerProperties = KafkaTestUtils.consumerProps("jsa-group", "false", embeddedKafka);

        // create a Kafka consumer factory
        DefaultKafkaConsumerFactory<String, Integer> consumerFactory = new DefaultKafkaConsumerFactory<String, Integer>(integerProperties);

        // set the topic that needs to be consumed
        ContainerProperties containerProperties = new ContainerProperties(SENDER_TOPIC);

        // create a Kafka MessageListenerContainer
        container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);

        // setup a Kafka message listener
        container.setupMessageListener(new MessageListener<String, Integer>() {
            @Override
            public void onMessage(ConsumerRecord<String, Integer> record) {
//                LOGGER.debug("test-listener received message='{}'", record.toString());
                records.add(record);
            }
        });

        // start the container and underlying message listener
        container.start();
    }

    @After
    public void tearDown() {
        // stop the container
        container.stop();
    }

    @Test
    public void testTemplate() throws Exception {
        // send the message
        String greeting = "Hello Spring Kafka Sender!";
        Integer i1 = 12;
        consumer.processMessage(i1);

        // check that the message was received
        ConsumerRecord<String, Integer> received = records.poll(10, TimeUnit.SECONDS);
        // Hamcrest Matchers to check the value
        assertThat(received, hasValue(i1));
        // AssertJ Condition to check the key
        assertThat(received).has(key(null));
    }
}
0kjbasz6

0kjbasz61#

以下是我发现的总结:
我没有使用最新的spring引导版本。在教程之后,我使用了1.5.10.release。这没什么问题,但它和SpringKafka有兼容性问题。
我试着使用最新版本的springkafka和springboot1.5.10.release。在处理某些找不到的类时不断出现错误。不得不将Kafka的版本降低到1.3.2.0版本
此配置允许我运行spring引导应用程序,但单元测试失败,因此出现堆栈溢出问题。
我试图重写我的gradle文件以使用较新的spring boot和kafka,但是失败了,我认为存储库不好。
解决方案
最后去了spring的网站并使用了他们的项目生成器。在最新的 Spring Kafka和推动拉。这给了我一个新的建设。grad尔与适当的回购。在中添加了springkafka测试,并成功执行了单元测试。

相关问题