将CSV文件转换为Java对象(POJO)并将其发送到ActiveMQ队列

k2arahey  于 12个月前  发布在  Java
关注(0)|答案(1)|浏览(114)

我的目标是读取csv文件,将其转换为Java对象(POJO),并将Java对象逐个发送到JMS队列。下面是代码:

public void configure() throws Exception {
    from("file:src/main/resources?fileName=data.csv")               
    .unmarshal(bindy)
    .split(body())
    .to("file:src/main/resources/?fileName=equityfeeds.txt")
    .split().tokenize(",").streaming().to("jms:queue:javaobjects.upstream.queue");          
}

**问题:**1.当我执行代码时,没有文件(equityfeeds. txt)被创建,也没有对象进入队列。你怎么了?我现在不需要做任何处理。我只需要将csv解组为POJO,并将Java对象逐个发送到JMS队列。
EquityFeeds(POJO)

@CsvRecord(separator = ",", skipFirstLine = true)
public class EquityFeeds {
    
    @DataField(pos = 1) 
    private String externalTransactionId;
    
    @DataField(pos = 2)
    private String clientId;
    
    @DataField(pos = 3)
    private String securityId;
    
    @DataField(pos = 4)
    private String transactionType;
    
    @DataField(pos = 5, pattern = "dd/MM/YY")
    private Date transactionDate;
    
    @DataField(pos = 6)
    private float marketValue; 
    
    @DataField(pos = 7)
    private String priorityFlag;

请帮帮忙。请告诉我我错在哪里。
@pvpkiran:下面是我的Camel Code for producer:

public void configure() throws Exception {
    from("file:src/main/resources?fileName=data.csv")               
        .unmarshal(bindy)
        .split(body())
        .streaming().to("jms:queue:javaobjects.upstream.queue");
}

下面是我的消费者代码(使用JMS API):

@JmsListener(destination = "javaobjects.upstream.queue")
public void javaObjectsListener(final Message objectMessage) throws JMSException {
    Object messageData = null;
    if(objectMessage instanceof ObjectMessage) {
        ObjectMessage objMessage = (ObjectMessage) objectMessage;
        messageData = objMessage.getObject();
    }
    System.out.println("Object: "+messageData.toString());
}

我没有使用Camel来使用JMS消息。在消费者中,我使用JMS API来消费消息(如上所述)。我也不测试代码。在我的终端得到NullPointerException。此外,DLQ中还有2条消息,给出以下错误消息:

java.lang.Throwable: Delivery[7] exceeds redelivery policy limit:RedeliveryPolicy {destination = null, collisionAvoidanceFactor = 0.15, maximumRedeliveries = 6, maximumRedeliveryDelay = -1, initialRedeliveryDelay = 1000, useCollisionAvoidance = false, useExponentialBackOff = false, backOffMultiplier = 5.0, redeliveryDelay = 1000, preDispatchCheck = true}, cause:null
3ks5zfa0

3ks5zfa01#

试试这个,应该可以

from("file:src/main/resources?fileName=equityfeeds.csv")
                    .unmarshal(new BindyCsvDataFormat(EquityFeeds.class))
                    .split(body())
                    .streaming().to("jms:queue:javaobjects.upstream.queue");
// This route is for Testing
from("jms:queue:javaobjects.upstream.queue").to("bean:camelBeanComponent?method=processRoute");

并编写一个消费者组件bean

@Component
public class CamelBeanComponent {
    public void processRoute(Exchange exchange) {
        System.out.println(exchange.getIn().getBody());
    }
}

打印出来(如果需要这样的输出,需要添加toString()

EquityFeeds(externalTransactionId=SAPEXTXN1, clientId=GS, securityId=ICICI, transactionType=BUY, transactionDate=Sun Dec 30 00:00:00 CET 2012, marketValue=101.9, priorityFlag=Y)
EquityFeeds(externalTransactionId=SAPEXTXN2, clientId=AS, securityId=REL, transactionType=SELL, transactionDate=Sun Dec 30 00:00:00 CET 2012, marketValue=121.9, priorityFlag=N)

如果使用.split().tokenize(","),则每行(不完整行)中的每个字段都将转换为EquityFeeds对象(其他字段为null),并作为消息发送到队列

相关问题