spring xd处理时出错:kafkamessage,messagedispatchingexception:dispatcher没有订阅服务器

piwo6bdm  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(383)

我正在使用SpringXD阅读kafka的一个主题,使用spark流处理器过滤数据,并将数据放入spark中。
我用来部署流的命令是:

stream create spark-streaming-word-count --definition "kafka --zkconnect=localhost:2181 --topic=log-stream | java-word-count | log" --deploy

我得到的错误是:

2015-05-23 11:36:16,190 1.1.1.RELEASE ERROR dispatcher-1 listener.LoggingErrorHandler - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3699841462, key = java.nio.HeapByteBuffer[pos=0 lim=6 cap=437], payload = java.nio.HeapByteBuffer[pos=0 lim=427 cap=427]), KafkaMessageMetadata [offset=26353, nextOffset=26354, Partition[topic='log-stream', id=0]]
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'admin:default,admin,singlenode,hsqldbServer:9393.spark-streaming-word-count.0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:248)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:171)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:119)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)

请帮我解决这个问题
谢谢

cbjzeqam

cbjzeqam1#

流部署状态是什么?shell命令 stream list 将为您提供流部署状态。另外,试试看 runtime modules 查看正在运行的模块。它看起来像是下游模块 java-word-count 尚未部署。

相关问题