我不确定我是否误解了MongoDB camel模块的文档,但是阅读文档似乎表明传入的主体将被转换为文档或列表,因此可以将其插入到集合中。
基本上,我有一个非常简单的路线(TransformRoute),它接受CSV数据,将其转换为java bean,然后使用Jackson通过ActiveMQ发送它,并将其封送为JSON。如果我通过ActiveMQ将每个javabean作为消息发送,它将按预期工作,但如果将整个CSV文件转换为一个javabean列表并通过ActiveMQ发送,则会失败,老实说,我有点不知所措。
转换路由
@Component
public class TransformRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
DataFormat bind = new BindyCsvDataFormat(UserBean.class);
from("activemq:unpacked")
.routeId("transformRoute")
.unmarshal(bind)
.marshal().json(JsonLibrary.Jackson, true)
.log("${body}")
.to("activemq:save");
}
}
持续路由
@Component
public class PersistRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("activemq:save")
.routeId("persistRoute")
.log("Received message at PersistRoute")
.to("mongodb:connectionBean?database=test&collection=user&operation=save")
.setBody(simple("${body}"));
}
}
用户Bean
@Getter
@Setter
@NoArgsConstructor
@CsvRecord(separator = ",", isOrdered = true)
public class UserBean implements Serializable {
@DataField(pos = 1)
private Integer id;
@DataField(pos = 2)
private String firstName;
@DataField(pos = 3)
private String surName;
@DataField(pos = 4)
private String workEmail;
@DataField(pos = 5)
private String personalEmail;
@DataField(pos = 6)
private String profession;
}
错误
Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source ID Processor Elapsed (ms)
persistRoute/persistRoute from[activemq://save] 3800927
...
persistRoute/to4 mongodb:connectionBean?database=test&collection=us 0
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.component.mongodb.CamelMongoDbException: Body incorrect type for save
at org.apache.camel.component.mongodb.MongoDbProducer.lambda$createDoSave$14(MongoDbProducer.java:643) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.component.mongodb.MongoDbProducer.lambda$wrap$0(MongoDbProducer.java:260) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.component.mongodb.MongoDbProducer.invokeOperation(MongoDbProducer.java:138) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.component.mongodb.MongoDbProducer.process(MongoDbProducer.java:125) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.19.0.jar:3.19.0]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172) ~[camel-core-processor-3.19.0.jar:3.19.0]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.19.0.jar:3.19.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.19.0.jar:3.19.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.19.0.jar:3.19.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:175) ~[camel-core-processor-3.19.0.jar:3.19.0]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.19.0.jar:3.19.0]
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-engine-3.19.0.jar:3.19.0]
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[camel-support-3.19.0.jar:3.19.0]
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:132) ~[camel-jms-3.19.0.jar:3.19.0]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:331) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:270) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) ~[spring-jms-5.3.23.jar:5.3.23]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: org.bson.Document but has type: byte[] on: JmsMessage[JmsMessageID: ID:atlantis-35375-1665732206912-51:1:3:1:1]. Caused by: Error during type conversion from type: byte[] to the required type: org.bson.Document with value [B@7f2e6fb due to org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.. Exchange[]. Caused by: [org.apache.camel.TypeConversionException - Error during type conversion from type: byte[] to the required type: org.bson.Document with value [B@7f2e6fb due to org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.]
at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:125) ~[camel-support-3.19.0.jar:3.19.0]
at org.apache.camel.component.mongodb.MongoDbProducer.lambda$createDoSave$14(MongoDbProducer.java:627) ~[camel-mongodb-3.19.0.jar:3.19.0]
... 24 common frames omitted
Caused by: org.apache.camel.TypeConversionException: Error during type conversion from type: byte[] to the required type: org.bson.Document with value [B@7f2e6fb due to org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.
at org.apache.camel.support.SimpleTypeConverter.convertTo(SimpleTypeConverter.java:105) ~[camel-support-3.19.0.jar:3.19.0]
at org.apache.camel.impl.converter.CoreTypeConverterRegistry.doConvertTo(CoreTypeConverterRegistry.java:479) ~[camel-base-3.19.0.jar:3.19.0]
at org.apache.camel.impl.converter.CoreTypeConverterRegistry.doConvertTo(CoreTypeConverterRegistry.java:358) ~[camel-base-3.19.0.jar:3.19.0]
at org.apache.camel.impl.converter.CoreTypeConverterRegistry.mandatoryConvertTo(CoreTypeConverterRegistry.java:271) ~[camel-base-3.19.0.jar:3.19.0]
at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:123) ~[camel-support-3.19.0.jar:3.19.0]
... 25 common frames omitted
Caused by: org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:689) ~[bson-4.7.1.jar:na]
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721) ~[bson-4.7.1.jar:na]
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:449) ~[bson-4.7.1.jar:na]
at org.bson.codecs.DocumentCodec.decode(DocumentCodec.java:177) ~[bson-4.7.1.jar:na]
at org.apache.camel.component.mongodb.converters.MongoDbBasicConverters.fromByteArrayToDocument(MongoDbBasicConverters.java:91) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.component.mongodb.converters.MongoDbBasicConvertersLoader.lambda$registerConverters$2(MongoDbBasicConvertersLoader.java:49) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.support.SimpleTypeConverter.convertTo(SimpleTypeConverter.java:101) ~[camel-support-3.19.0.jar:3.19.0]
... 29 common frames omitted
2022-10-14 10:36:38.570 WARN 5841 --- [sConsumer[save]] o.a.c.c.jms.EndpointMessageListener : Execution of JMS message listener failed. Caused by: [org.apache.camel.TypeConversionException - Error during type conversion from type: byte[] to the required type: org.bson.Document with value [B@7f2e6fb due to org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.]
org.apache.camel.TypeConversionException: Error during type conversion from type: byte[] to the required type: org.bson.Document with value [B@7f2e6fb due to org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.
at org.apache.camel.support.SimpleTypeConverter.convertTo(SimpleTypeConverter.java:105) ~[camel-support-3.19.0.jar:3.19.0]
at org.apache.camel.impl.converter.CoreTypeConverterRegistry.doConvertTo(CoreTypeConverterRegistry.java:479) ~[camel-base-3.19.0.jar:3.19.0]
at org.apache.camel.impl.converter.CoreTypeConverterRegistry.doConvertTo(CoreTypeConverterRegistry.java:358) ~[camel-base-3.19.0.jar:3.19.0]
at org.apache.camel.impl.converter.CoreTypeConverterRegistry.mandatoryConvertTo(CoreTypeConverterRegistry.java:271) ~[camel-base-3.19.0.jar:3.19.0]
at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:123) ~[camel-support-3.19.0.jar:3.19.0]
at org.apache.camel.component.mongodb.MongoDbProducer.lambda$createDoSave$14(MongoDbProducer.java:627) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.component.mongodb.MongoDbProducer.lambda$wrap$0(MongoDbProducer.java:260) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.component.mongodb.MongoDbProducer.invokeOperation(MongoDbProducer.java:138) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.component.mongodb.MongoDbProducer.process(MongoDbProducer.java:125) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.19.0.jar:3.19.0]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172) ~[camel-core-processor-3.19.0.jar:3.19.0]
at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.19.0.jar:3.19.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.19.0.jar:3.19.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.19.0.jar:3.19.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:175) ~[camel-core-processor-3.19.0.jar:3.19.0]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.19.0.jar:3.19.0]
at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-engine-3.19.0.jar:3.19.0]
at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[camel-support-3.19.0.jar:3.19.0]
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:132) ~[camel-jms-3.19.0.jar:3.19.0]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:331) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:270) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) ~[spring-jms-5.3.23.jar:5.3.23]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) ~[spring-jms-5.3.23.jar:5.3.23]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.
at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:689) ~[bson-4.7.1.jar:na]
at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721) ~[bson-4.7.1.jar:na]
at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:449) ~[bson-4.7.1.jar:na]
at org.bson.codecs.DocumentCodec.decode(DocumentCodec.java:177) ~[bson-4.7.1.jar:na]
at org.apache.camel.component.mongodb.converters.MongoDbBasicConverters.fromByteArrayToDocument(MongoDbBasicConverters.java:91) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.component.mongodb.converters.MongoDbBasicConvertersLoader.lambda$registerConverters$2(MongoDbBasicConvertersLoader.java:49) ~[camel-mongodb-3.19.0.jar:3.19.0]
at org.apache.camel.support.SimpleTypeConverter.convertTo(SimpleTypeConverter.java:101) ~[camel-support-3.19.0.jar:3.19.0]
... 29 common frames omitted
记录的数据
[ {
"id" : 100,
"firstName" : "Alexine",
"surName" : "McClimans",
"workEmail" : "Alexine.McClimans@yopmail.com",
"personalEmail" : "Alexine.McClimans@gmail.com",
"profession" : "police officer"
}, {
"id" : 101,
"firstName" : "Emmey",
"surName" : "Wildermuth",
"workEmail" : "Emmey.Wildermuth@yopmail.com",
"personalEmail" : "Emmey.Wildermuth@gmail.com",
"profession" : "firefighter"
} ]
我错过了什么?
1条答案
按热度按时间ffvjumwh1#
操作
save
不适用于文档集合,只需要一个文档。您得到的错误与以下事实有关:它假设您尝试只保存一个文档,但由于它可能是JSON或BSON,它首先尝试检测它是BSON还是JSON,而在这里它(错误地)检测到了BSON,这就是为什么您最终会出现这个奇怪的错误。
在您的情况下,应该使用
insert
操作,而不是支持文档、文档集合或文档数组的操作。你的路线应该是这样的: