classcastexception与spring云和kafka流

pes8fvy9  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(423)

我开始了一个新的sb应用程序,将作为Kafka的消费者,我开始玩Kafka流,但我得到以下例外启动应用程序时
java.lang.classcastexception:无法将类com.sun.proxy.$proxy101强制转换为类org.springframework.messaging.messagechannel(com.sun.proxy.$proxy101和org.springframework.messaging.messagechannel位于加载程序“app”的未命名模块中)org.springframework.cloud.stream.binder.abstractmessagechannelbinder.dobindconsumer(abstractmessagechannelbinder)。java:91)~[spring-cloud-stream-3.1.2。jar:3.1.2]位于org.springframework.cloud.stream.binder.abstractbinder.bindconsumer(abstractbinder.com)。java:143)~[spring-cloud-stream-3.1.2。jar:3.1.2]在org.springframework.cloud.stream.binding.bindingservice.lambda$rescheduleconsumerbinding$1(bindingservice)。java:201)~[spring-cloud-stream-3.1.2。jar:3.1.2]在org.springframework.scheduling.support.delegatingerrorhandlingrunnable.run(delegatingerrorhandlingrunnable。java:54)~[spring-context-5.3.5。jar:5.3.5]在java.base/java.util.concurrent.executors$runnableadapter.call(executors。java:515)~[na:na]位于java.base/java.util.concurrent.futuretask.run(futuretask。java:264)~[na:na]位于java.base/java.util.concurrent.scheduledthreadpoolexecutor$scheduledfuturetask.run(scheduledthreadpoolexecutor)。java:304)~[na:na]在java.base/java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1128)~[na:na]位于java.base/java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:628)~[na:na]在java.base/java.lang.thread.run(thread。java:834)~[娜:安]
这就是我如何声明一个字被发送多少次的kstream:

@Bean
  public Consumer<KStream<Bytes, String>> target() {
    return input -> input.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .map((key, value) -> new KeyValue<>(value, value))
        .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
        .windowedBy(TimeWindows.of(Duration.ofMillis(30000))).count(Materialized.as("words-count"))
        .toStream().map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
            new Date(key.window().start()), new Date(key.window().end()))));
  }

使用相同的application.yml并具有仅用于接收这样的消息的使用者函数

@Bean
  public Consumer<Message<String>> target() {
    return message -> {
      System.out.println("******************");
      System.out.println("Received message from source: " + message.getPayload());
    };
  }

一切正常。
我使用以下版本:
springboot v2.4.4版
springcloud版本2020.0.2
java v11.0.10版
在单元测试期间,我也看到过其他帖子出现这种错误,但我甚至还没有进行过任何junit测试。
任何帮助都将不胜感激

xmd2e60i

xmd2e60i1#

看起来类路径中没有kafka streams绑定器,而是消息通道绑定器。确保你有依赖关系 spring-cloud-stream-binder-kakfa-streams 在类路径上。

相关问题