rabbitmq Spring数据流:文件源在脚本处理器启动之前将有效负载发送到队列

ac1kyiln  于 2023-01-26  发布在  RabbitMQ
关注(0)|答案(1)|浏览(131)

我使用来自https://dataflow.spring.io/rabbitmq-maven-latest的预定义rabbitmq应用程序通过Spring DataFlow创建了非常简单的流。

file --mode=content --directory=/home/cnb/documents | script | log

问题是当我部署流时,文件源在脚本处理器应用程序启动之前读取文件。因此,我遇到了一个错误,即文件源没有接收解析文件有效负载的订阅者-日志如下:

2023-01-16 09:27:55.425  INFO [file-source,,] 333 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: rabbit
2023-01-16 09:27:55.614 ERROR [file-source,88b4a7f13a17dd71,86aacf31285b41dd] 333 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'fileReadingFlow.channel#2'; nested exception is java.lang.IllegalStateException: The [bean 'fileReadingFlow.channel#2'; defined in: 'org.springframework.cloud.fn.supplier.file.FileSupplierConfiguration'; from source: 'bean method fileReadingFlow'] doesn't have subscribers to accept messages, failedMessage=GenericMessage [payload=byte[33511], headers={b3=88b4a7f13a17dd71-f5dbcbe1db7e1bff-0, nativeHeaders={}, file_name=test1.csv, file_originalFile=/home/cnb/documents/test1.csv, id=10dc5789-f59d-d3c3-7c11-4e46f761cff6, contentType=application/octet-stream, file_relativePath=test1.csv, timestamp=1673861275578}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:325)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:268)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:232)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:325)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:268)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:232)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:296)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:277)
    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
    at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
    at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
    at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
    at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$5(FluxMessageChannel.java:123)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
    at org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth.lambda$null$6(ReactorSleuth.java:324)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: The [bean 'fileReadingFlow.channel#2'; defined in: 'org.springframework.cloud.fn.supplier.file.FileSupplierConfiguration'; from source: 'bean method fileReadingFlow'] doesn't have subscribers to accept messages
    at org.springframework.util.Assert.state(Assert.java:97)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    ... 55 more

当我将文件“test1.csv”放在部署的流之后时,一切正常。但当我部署流并且“test1.csv”已在目录中时-我看到由于流组件的启动竞争而导致的上述错误。
数据流中是否有方法:
1.设置流组件的启动顺序?(例如,我想按1 -脚本、2 -文件、3 -日志的顺序启动流组件)
1.也许我可以通过Dataflow UI为文件源配置一些启动超时?因此,通过此操作,我将实现文件源开始读取文件时的启动超时-此时流处理器将准备好处理此文件。
提前感谢,请让我知道,如果需要的话,我可以提供更多的细节!
如果我有多个文件在目录中,我看,并已处理他们由我的流。当我重新启动数据流服务器-这些文件将被再次处理?谢谢提前!

bihw5rsg

bihw5rsg1#

我能想到的解决这个问题的唯一方法可能是使用一个命名主题。

file --mode=content --directory=/home/cnb/documents > :file-topic
:file-topic > script | log

相关问题