如何让kafka在springxd运行时使用http流数据?

w9apscun  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(466)

有没有可能让kafka源模块在springxd运行时作为处理器模块工作?有代码样本吗?
我正在尝试实现这样的功能:http(xd源)| kafka源(xd处理器)| kafka消费者(xd接收器)
我尝试这样做是因为我有来自http的流数据,我想用kafka消息总线来管理。
我的流定义如下:

stream create kafkaSourceTest --definition "http --outputType=application/json | kafka --zkconnect=localhost:2181 --topic=kafkaTestTopic | log " --deploy

在spring xd的处理器模块中使用kafka源模块实现的现成实现会导致如下错误:

2015-05-12 11:18:52,914 1.1.1.RELEASE ERROR pool-13-thread-4 http.NettyHttpInboundChannelAdapter - Error sending message

org.springframework.messaging.messagedeliveryexception:dispatcher没有频道“”的订阅服务器admin:default,管理,单节点,hsqldbserver:9393.kafkasourcetest.0'.; 嵌套的异常是org.springframework.integration.messagedispatchingexception:dispatcher在org.springframework.integration.channel.abstractsubscribablechannel.dosend(abstractsubscribablechannel)没有订阅服务器。java:81)在org.springframework.integration.channel.abstractmessagechannel.send(abstractmessagechannel。java:277)在org.springframework.integration.channel.abstractmessagechannel.send(abstractmessagechannel)。java:239)在org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate。java:115)在org.springframework.messaging.core.genericmessagingtemplate.dosend(genericmessagingtemplate。java:45)

dkqlctbz

dkqlctbz1#

我尝试这样做是因为我有来自http的流数据,我想用kafka消息总线来管理。
如果您使用kafka作为消息总线(在设置transport之后),那么像“http | log”这样的流将使http消息流通过kafka消息总线。在本例中,kafka代理中的主题将由xd内部构件定义。
有没有可能让kafka源模块在springxd运行时作为处理器模块工作?
不,源模块不能充当处理器模块。如果您希望消息流经kafka中的特定主题,那么您可以拥有一个流,该流具有从http源接收数据的kafka接收器模块和另一个流,该流使用相同的主题配置kafka源模块。
这可以通过以下方式实现:
stream create kafkasink--definition“http--outputtype=application/json | kafka--brokerlist=--topic=kafkatesttopic”--部署
流创建kafkasource--定义“kafka--连接”=localhost:2181 --topic=kafkatesttopic |日志”--部署

相关问题