Camel 如何将对象列表保存到mongodb?

vnzz0bqm  于 2022-11-07  发布在  Apache
关注(0)|答案(1)|浏览(194)

我不确定我是否误解了MongoDB camel模块的文档,但是阅读文档似乎表明传入的主体将被转换为文档或列表,因此可以将其插入到集合中。
基本上,我有一个非常简单的路线(TransformRoute),它接受CSV数据,将其转换为java bean,然后使用Jackson通过ActiveMQ发送它,并将其封送为JSON。如果我通过ActiveMQ将每个javabean作为消息发送,它将按预期工作,但如果将整个CSV文件转换为一个javabean列表并通过ActiveMQ发送,则会失败,老实说,我有点不知所措。

转换路由

@Component
public class TransformRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        DataFormat bind = new BindyCsvDataFormat(UserBean.class);

        from("activemq:unpacked")
            .routeId("transformRoute")
            .unmarshal(bind)
            .marshal().json(JsonLibrary.Jackson, true)
            .log("${body}")
            .to("activemq:save");
    }
}

持续路由

@Component
public class PersistRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from("activemq:save")
            .routeId("persistRoute")
            .log("Received message at PersistRoute")
            .to("mongodb:connectionBean?database=test&collection=user&operation=save")
            .setBody(simple("${body}"));
    }
}

用户Bean

@Getter
@Setter
@NoArgsConstructor
@CsvRecord(separator = ",", isOrdered = true)
public class UserBean implements Serializable {

    @DataField(pos = 1)
    private Integer id;
    @DataField(pos = 2)
    private String firstName;
    @DataField(pos = 3)
    private String surName;
    @DataField(pos = 4)
    private String workEmail;
    @DataField(pos = 5)
    private String personalEmail;
    @DataField(pos = 6)
    private String profession;
}

错误

Message History (source location and message history is disabled)
---------------------------------------------------------------------------------------------------------------------------------------
Source                                   ID                             Processor                                          Elapsed (ms)
                                         persistRoute/persistRoute      from[activemq://save]                                   3800927
    ...
                                         persistRoute/to4               mongodb:connectionBean?database=test&collection=us            0

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

org.apache.camel.component.mongodb.CamelMongoDbException: Body incorrect type for save
    at org.apache.camel.component.mongodb.MongoDbProducer.lambda$createDoSave$14(MongoDbProducer.java:643) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.component.mongodb.MongoDbProducer.lambda$wrap$0(MongoDbProducer.java:260) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.component.mongodb.MongoDbProducer.invokeOperation(MongoDbProducer.java:138) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.component.mongodb.MongoDbProducer.process(MongoDbProducer.java:125) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.19.0.jar:3.19.0]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172) ~[camel-core-processor-3.19.0.jar:3.19.0]
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.19.0.jar:3.19.0]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:175) ~[camel-core-processor-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-engine-3.19.0.jar:3.19.0]
    at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[camel-support-3.19.0.jar:3.19.0]
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:132) ~[camel-jms-3.19.0.jar:3.19.0]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:331) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:270) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) ~[spring-jms-5.3.23.jar:5.3.23]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: org.bson.Document but has type: byte[] on: JmsMessage[JmsMessageID: ID:atlantis-35375-1665732206912-51:1:3:1:1]. Caused by: Error during type conversion from type: byte[] to the required type: org.bson.Document with value [B@7f2e6fb due to org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.. Exchange[]. Caused by: [org.apache.camel.TypeConversionException - Error during type conversion from type: byte[] to the required type: org.bson.Document with value [B@7f2e6fb due to org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.]
    at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:125) ~[camel-support-3.19.0.jar:3.19.0]
    at org.apache.camel.component.mongodb.MongoDbProducer.lambda$createDoSave$14(MongoDbProducer.java:627) ~[camel-mongodb-3.19.0.jar:3.19.0]
    ... 24 common frames omitted
Caused by: org.apache.camel.TypeConversionException: Error during type conversion from type: byte[] to the required type: org.bson.Document with value [B@7f2e6fb due to org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.
    at org.apache.camel.support.SimpleTypeConverter.convertTo(SimpleTypeConverter.java:105) ~[camel-support-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.converter.CoreTypeConverterRegistry.doConvertTo(CoreTypeConverterRegistry.java:479) ~[camel-base-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.converter.CoreTypeConverterRegistry.doConvertTo(CoreTypeConverterRegistry.java:358) ~[camel-base-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.converter.CoreTypeConverterRegistry.mandatoryConvertTo(CoreTypeConverterRegistry.java:271) ~[camel-base-3.19.0.jar:3.19.0]
    at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:123) ~[camel-support-3.19.0.jar:3.19.0]
    ... 25 common frames omitted
Caused by: org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.
    at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:689) ~[bson-4.7.1.jar:na]
    at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721) ~[bson-4.7.1.jar:na]
    at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:449) ~[bson-4.7.1.jar:na]
    at org.bson.codecs.DocumentCodec.decode(DocumentCodec.java:177) ~[bson-4.7.1.jar:na]
    at org.apache.camel.component.mongodb.converters.MongoDbBasicConverters.fromByteArrayToDocument(MongoDbBasicConverters.java:91) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.component.mongodb.converters.MongoDbBasicConvertersLoader.lambda$registerConverters$2(MongoDbBasicConvertersLoader.java:49) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.support.SimpleTypeConverter.convertTo(SimpleTypeConverter.java:101) ~[camel-support-3.19.0.jar:3.19.0]
    ... 29 common frames omitted

2022-10-14 10:36:38.570  WARN 5841 --- [sConsumer[save]] o.a.c.c.jms.EndpointMessageListener      : Execution of JMS message listener failed. Caused by: [org.apache.camel.TypeConversionException - Error during type conversion from type: byte[] to the required type: org.bson.Document with value [B@7f2e6fb due to org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.]

org.apache.camel.TypeConversionException: Error during type conversion from type: byte[] to the required type: org.bson.Document with value [B@7f2e6fb due to org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.
    at org.apache.camel.support.SimpleTypeConverter.convertTo(SimpleTypeConverter.java:105) ~[camel-support-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.converter.CoreTypeConverterRegistry.doConvertTo(CoreTypeConverterRegistry.java:479) ~[camel-base-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.converter.CoreTypeConverterRegistry.doConvertTo(CoreTypeConverterRegistry.java:358) ~[camel-base-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.converter.CoreTypeConverterRegistry.mandatoryConvertTo(CoreTypeConverterRegistry.java:271) ~[camel-base-3.19.0.jar:3.19.0]
    at org.apache.camel.support.MessageSupport.getMandatoryBody(MessageSupport.java:123) ~[camel-support-3.19.0.jar:3.19.0]
    at org.apache.camel.component.mongodb.MongoDbProducer.lambda$createDoSave$14(MongoDbProducer.java:627) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.component.mongodb.MongoDbProducer.lambda$wrap$0(MongoDbProducer.java:260) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.component.mongodb.MongoDbProducer.invokeOperation(MongoDbProducer.java:138) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.component.mongodb.MongoDbProducer.process(MongoDbProducer.java:125) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) ~[camel-support-3.19.0.jar:3.19.0]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172) ~[camel-core-processor-3.19.0.jar:3.19.0]
    at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) ~[camel-core-processor-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) ~[camel-base-engine-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.19.0.jar:3.19.0]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:175) ~[camel-core-processor-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) ~[camel-base-engine-3.19.0.jar:3.19.0]
    at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) ~[camel-base-engine-3.19.0.jar:3.19.0]
    at org.apache.camel.support.AsyncProcessorSupport.process(AsyncProcessorSupport.java:41) ~[camel-support-3.19.0.jar:3.19.0]
    at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:132) ~[camel-jms-3.19.0.jar:3.19.0]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:331) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:270) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1237) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1227) ~[spring-jms-5.3.23.jar:5.3.23]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1120) ~[spring-jms-5.3.23.jar:5.3.23]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.bson.BsonInvalidOperationException: readStartDocument can only be called when CurrentBSONType is DOCUMENT, not when CurrentBSONType is ARRAY.
    at org.bson.AbstractBsonReader.verifyBSONType(AbstractBsonReader.java:689) ~[bson-4.7.1.jar:na]
    at org.bson.AbstractBsonReader.checkPreconditions(AbstractBsonReader.java:721) ~[bson-4.7.1.jar:na]
    at org.bson.AbstractBsonReader.readStartDocument(AbstractBsonReader.java:449) ~[bson-4.7.1.jar:na]
    at org.bson.codecs.DocumentCodec.decode(DocumentCodec.java:177) ~[bson-4.7.1.jar:na]
    at org.apache.camel.component.mongodb.converters.MongoDbBasicConverters.fromByteArrayToDocument(MongoDbBasicConverters.java:91) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.component.mongodb.converters.MongoDbBasicConvertersLoader.lambda$registerConverters$2(MongoDbBasicConvertersLoader.java:49) ~[camel-mongodb-3.19.0.jar:3.19.0]
    at org.apache.camel.support.SimpleTypeConverter.convertTo(SimpleTypeConverter.java:101) ~[camel-support-3.19.0.jar:3.19.0]
    ... 29 common frames omitted

记录的数据

[ {
  "id" : 100,
  "firstName" : "Alexine",
  "surName" : "McClimans",
  "workEmail" : "Alexine.McClimans@yopmail.com",
  "personalEmail" : "Alexine.McClimans@gmail.com",
  "profession" : "police officer"
}, {
  "id" : 101,
  "firstName" : "Emmey",
  "surName" : "Wildermuth",
  "workEmail" : "Emmey.Wildermuth@yopmail.com",
  "personalEmail" : "Emmey.Wildermuth@gmail.com",
  "profession" : "firefighter"
} ]

我错过了什么?

ffvjumwh

ffvjumwh1#

操作save不适用于文档集合,只需要一个文档。
您得到的错误与以下事实有关:它假设您尝试只保存一个文档,但由于它可能是JSON或BSON,它首先尝试检测它是BSON还是JSON,而在这里它(错误地)检测到了BSON,这就是为什么您最终会出现这个奇怪的错误。
在您的情况下,应该使用insert操作,而不是支持文档、文档集合或文档数组的操作。
你的路线应该是这样的:

from("activemq:save")
    .routeId("persistRoute")
    .log("Received message at PersistRoute")
    .to("mongodb:connectionBean?database=test&collection=user&operation=insert")
    .setBody(simple("${body}"));

相关问题