我正在尝试使用springcloudstream来发布和消费kafka消息。我一直在研究访问绑定频道的文档。我尝试在频道上为我的主题使用一个自定义名称,所以在尝试注入时有一个@qualifier,但是spring找不到相关的bean。它说“对于每个绑定的接口,springcloudstream将生成一个实现接口的bean”,但是自动连接不起作用。
我得到的错误是“com中构造函数的参数0…messagingmanager需要找不到类型为'org.springframework.messaging.messagechannel'的bean。”
我尝试在messagingmanager构造函数之前使用@autowired,就像在示例中一样,但是在bean工厂中得到了一个类似的错误,关于有两个这样的构造函数,所以我去掉了它,得到了当前的错误。
可能是因为我想用处理器而变得复杂了。
这是我的部件。我正在用spring boot运行它,并尝试用以下方法测试它:
@Component
public class StartupTester implements ApplicationListener<ContextRefreshedEvent> {
MessagingManager messagingManager;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
messagingManager.sendThingCreatedMessage(new ThingCreated("12345", "667788"));
}
}
@Component
public class MessagingManager {
private MessageChannel thingCreatedChannel;
public MessagingManager(@Qualifier(ThingChannelProcessor.THING_CREATED) MessageChannel output) {
thingCreatedChannel = output;
}
public void sendThingCreatedMessage(ThingCreated thingCreated) {
thingCreatedChannel.send(MessageBuilder.withPayload(thingCreated).build());
}
}
@Component
public interface ThingsChannelProcessor extends Processor {
String THING_REQUEST = "thing-request";
String THING_CREATED = "thing-created";
@Input(THING_REQUEST )
SubscribableChannel thingsRequest();
@Output(THING_CREATED )
MessageChannel thingCreated();
}
我的主类上也有@enablebinding(thingsmessagingmanager.class),用@springbootapplication注解。
1条答案
按热度按时间b1zrtrql1#
我无法再现你的错误。但我有几点你可以理解:
您不需要使用
@Component
你的电脑好像有错@EnableBinding
你应该有@EnableBinding(ThingsChannelProcessor.class)
不是什么消息经理您也不需要扩展处理器,这可能是您第一次得到2个bean的原因。如果您正在自定义频道,则不需要从接收器/源/处理器下降,请查看
Barista
文档中的示例侦听contextrefresh也不会起作用,因为我们在刷新上下文后进行绑定
实际上,让我更清楚一点
4
. 我们创建了一个子上下文,因此为了确保您的上下文已经完全初始化,请确保您也在启动程序上实现了applicationcontextaware,并且在发送消息之前检查上下文是否相同,否则您将得到一个错误if(this.context.equals(event.getApplicationContext()))