我目前正在使用spring集成通过sftp获取一个.csv文件并处理它并发送到Kafka。我需要将这个.csv文件(它是一个输入流,我使用Files.splitter()
逐行拆分以进行处理)转换为POJO。我尝试过使用ObjectInputStream,但由于转换是逐行进行的,所以似乎不起作用。我想知道是否有人有更好的建议或方法将此.csv InputStream转换为POJO,或者可以为我指出正确的方向。
产品编号:
@Bean
public IntegrationFlow sftpFileTransferFlow(SessionFactory<SftpClient.DirEntry> sftpSessionFactory,
IntegrationFlowProperties properties,
MessageChannel inboundFilesMessageChannel) {
return IntegrationFlow
.from(Sftp.inboundStreamingAdapter(new RemoteFileTemplate<>(sftpSessionFactory))
.filter(new SftpRegexPatternFileListFilter(properties.getRemoteFilePattern()))
.remoteDirectory(properties.getRemoteDirectory()),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedRate(5000)))
.log(LoggingHandler.Level.DEBUG, "DataSftpToKafkaIntegrationFlow",
"headers['file_remoteDirectory'] + + T(java.io.File).separator + headers['file_remoteFile']")
.channel(inboundFilesMessageChannel)
.get();
}
@Bean
public IntegrationFlow readCsvFileFlow(MessageChannel inboundFilesMessageChannel,
QueueChannel kafkaPojoMessageChannel) {
return IntegrationFlow.from(inboundFilesMessageChannel)
.split(Files.splitter()
.markers(true)
.charset(StandardCharsets.UTF_8)
.firstLineAsHeader("myHeaders")
.applySequence(true))
// .transform(new StreamToEmailInteractionConverter()) // TODO: Figure this out so it doesn't get put on topic as .csv
.transform(new ObjectToJsonTransformer())
.log(LoggingHandler.Level.DEBUG,
"DataSftpToKafkaIntegrationFlow",
m -> "Payload: " + m.getPayload())
.channel(kafkaPojoMessageChannel)
.get();
}
域类:
@Getter
@Setter
public class EmailInteractionTest {
@CsvBindByName(column = "email")
private String email;
@CsvBindByName(column = "RECIPIENT_ID")
private String recipientId;
@CsvBindByName(column = "ENCODED_RECIPIENT_ID")
private String encodedRecipientId;
@CsvBindByName(column = "contactId")
private String contactId;
@CsvBindByName(column = "code")
private String code;
@CsvBindByName(column = "messageId")
private String messageId;
@CsvBindByName(column = "userAgent")
private String userAgent;
@CsvBindByName(column = "messageName")
private String messageName;
@CsvBindByName(column = "mailingTemplateId")
private String mailingTemplateId;
@CsvBindByName(column = "subjectLine")
private String subjectLine;
@CsvBindByName(column = "docType")
private String docType;
@CsvBindByName(column = "reportId")
private String reportId;
@CsvBindByName(column = "sendType")
private String sendType;
@CsvBindByName(column = "bounceType")
private String bounceType;
@CsvBindByName(column = "urlDescription")
private String urlDescription;
@CsvBindByName(column = "clickUrl")
private String clickUrl;
@CsvBindByName(column = "optOutDetails")
private String optOutDetails;
@CsvBindByName(column = "messageGroupId")
private String messageGroupId;
@CsvBindByName(column = "programId")
private String programId;
@CsvBindByName(column = "timestamp")
private String timestamp;
@CsvBindByName(column = "originatedFrom")
private String originatedFrom;
@CsvBindByName(column = "eventId")
private String eventId;
@CsvBindByName(column = "externalSystemName")
private String externalSystemName;
@CsvBindByName(column = "externalSystemReferenceId")
private String externalSystemReferenceId;
@CsvBindByName(column = "trackingCode")
private String trackingCode;
}
StreamToEmailInteractionConverter()
是我需要实现的类。它目前实现了GenericTransformer
接口,因此我可以将其插入readCsvFileFlow
方法中的transformer
方法。任何建议或帮助将不胜感激。我也有样本.csv数据,如果你想让我附上,以及。谢谢
1条答案
按热度按时间v64noz0r1#
Files.splitter()
为输入文件中的每一行生成消息,并将字符串作为有效负载。因此,ObjectToJsonTransformer
在这里是错误的,因为每行的有效负载已经是一个字符串。您可以查看JsonToObjectTransformer
,如果该字符串是JSON格式。否则,您需要提出一个逻辑,将该字符串转换为EmailInteractionTest
(如果这是文件中每行的结果表示)。