我正在通过flink读取csv文件。csv文件有特定的列数。
我已经定义了
RowCsvInputFormat format = new RowCsvInputFormat(filePath,
new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
});
如果文件中的所有行都有适当的4列,那么代码工作正常。
我想处理这样一种情况:文件中的几行没有4列,或者几行中存在任何其他问题。
我怎样才能在Flink做到这一点。
1条答案
按热度按时间wkftcu5l1#
如果你看一下wikipedia或rfc4180上的规范,csv文件似乎应该只包含具有相同列数的行。所以rowcsvinputformat不支持这一点是有道理的。
您可以使用readtextfile(路径)读取文件,然后在
flatMap()
运算符将字符串解析为行对象(如果行中有问题,则忽略)