csv 将InputStream转换为POJO

tp5buhyn  于 2023-09-28  发布在  其他
关注(0)|答案(1)|浏览(101)

我目前正在使用spring集成通过sftp获取一个.csv文件并处理它并发送到Kafka。我需要将这个.csv文件(它是一个输入流,我使用Files.splitter()逐行拆分以进行处理)转换为POJO。我尝试过使用ObjectInputStream,但由于转换是逐行进行的,所以似乎不起作用。我想知道是否有人有更好的建议或方法将此.csv InputStream转换为POJO,或者可以为我指出正确的方向。
产品编号:

@Bean
    public IntegrationFlow sftpFileTransferFlow(SessionFactory<SftpClient.DirEntry> sftpSessionFactory,
                                                IntegrationFlowProperties properties,
                                                MessageChannel inboundFilesMessageChannel) {

        return IntegrationFlow
                .from(Sftp.inboundStreamingAdapter(new RemoteFileTemplate<>(sftpSessionFactory))
                          .filter(new SftpRegexPatternFileListFilter(properties.getRemoteFilePattern()))
                          .remoteDirectory(properties.getRemoteDirectory()),
                      e -> e.id("sftpInboundAdapter")
                            .autoStartup(true)
                            .poller(Pollers.fixedRate(5000)))
                .log(LoggingHandler.Level.DEBUG, "DataSftpToKafkaIntegrationFlow",
                     "headers['file_remoteDirectory'] + + T(java.io.File).separator  + headers['file_remoteFile']")
                .channel(inboundFilesMessageChannel)
                .get();
    }

    @Bean
    public IntegrationFlow readCsvFileFlow(MessageChannel inboundFilesMessageChannel,
                                           QueueChannel kafkaPojoMessageChannel) {
        
        return IntegrationFlow.from(inboundFilesMessageChannel)
                              .split(Files.splitter()
                                          .markers(true)
                                          .charset(StandardCharsets.UTF_8)
                                          .firstLineAsHeader("myHeaders")
                                          .applySequence(true))
                              // .transform(new StreamToEmailInteractionConverter()) // TODO: Figure this out so it doesn't get put on topic as .csv
                              .transform(new ObjectToJsonTransformer())
                              .log(LoggingHandler.Level.DEBUG,
                                   "DataSftpToKafkaIntegrationFlow",
                                   m -> "Payload: " + m.getPayload())
                              .channel(kafkaPojoMessageChannel)
                              .get();
    }

域类:

@Getter
@Setter
public class EmailInteractionTest {
    @CsvBindByName(column = "email")
    private String email;
    @CsvBindByName(column = "RECIPIENT_ID")
    private String recipientId;
    @CsvBindByName(column = "ENCODED_RECIPIENT_ID")
    private String encodedRecipientId;
    @CsvBindByName(column = "contactId")
    private String contactId;
    @CsvBindByName(column = "code")
    private String code;
    @CsvBindByName(column = "messageId")
    private String messageId;
    @CsvBindByName(column = "userAgent")
    private String userAgent;
    @CsvBindByName(column = "messageName")
    private String messageName;
    @CsvBindByName(column = "mailingTemplateId")
    private String mailingTemplateId;
    @CsvBindByName(column = "subjectLine")
    private String subjectLine;
    @CsvBindByName(column = "docType")
    private String docType;
    @CsvBindByName(column = "reportId")
    private String reportId;
    @CsvBindByName(column = "sendType")
    private String sendType;
    @CsvBindByName(column = "bounceType")
    private String bounceType;
    @CsvBindByName(column = "urlDescription")
    private String urlDescription;
    @CsvBindByName(column = "clickUrl")
    private String clickUrl;
    @CsvBindByName(column = "optOutDetails")
    private String optOutDetails;
    @CsvBindByName(column = "messageGroupId")
    private String messageGroupId;
    @CsvBindByName(column = "programId")
    private String programId;
    @CsvBindByName(column = "timestamp")
    private String timestamp;
    @CsvBindByName(column = "originatedFrom")
    private String originatedFrom;
    @CsvBindByName(column = "eventId")
    private String eventId;
    @CsvBindByName(column = "externalSystemName")
    private String externalSystemName;
    @CsvBindByName(column = "externalSystemReferenceId")
    private String externalSystemReferenceId;
    @CsvBindByName(column = "trackingCode")
    private String trackingCode;
}

StreamToEmailInteractionConverter()是我需要实现的类。它目前实现了GenericTransformer接口,因此我可以将其插入readCsvFileFlow方法中的transformer方法。任何建议或帮助将不胜感激。我也有样本.csv数据,如果你想让我附上,以及。谢谢

v64noz0r

v64noz0r1#

Files.splitter()为输入文件中的每一行生成消息,并将字符串作为有效负载。因此,ObjectToJsonTransformer在这里是错误的,因为每行的有效负载已经是一个字符串。您可以查看JsonToObjectTransformer,如果该字符串是JSON格式。否则,您需要提出一个逻辑,将该字符串转换为EmailInteractionTest(如果这是文件中每行的结果表示)。

相关问题