flink:如何在csv中实现没有实际列数的typeinformation

kmb7vmvb  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(482)

我正在通过flink读取csv文件。csv文件有特定的列数。
我已经定义了

RowCsvInputFormat format = new RowCsvInputFormat(filePath, 
            new TypeInformation[]{  
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO
    });

如果文件中的所有行都有适当的4列,那么代码工作正常。
我想处理这样一种情况:文件中的几行没有4列,或者几行中存在任何其他问题。
我怎样才能在Flink做到这一点。

wkftcu5l

wkftcu5l1#

如果你看一下wikipedia或rfc4180上的规范,csv文件似乎应该只包含具有相同列数的行。所以rowcsvinputformat不支持这一点是有道理的。
您可以使用readtextfile(路径)读取文件,然后在 flatMap() 运算符将字符串解析为行对象(如果行中有问题,则忽略)

env.readTextFile(params.get("input"))
   .flatMap(someCsvRowParseFunction())

相关问题