我正在尝试使用flink编写一个csv文件作为Parquet。我正在使用下面的代码并得到错误。
val parquetFormat = new HadoopOutputFormat[Void, String](new AvroParquetOutputFormat, job)
FileOutputFormat.setOutputPath(job, new Path(outputPath))
我得到以下生成错误。有人能帮忙吗?
类型不匹配;发现:parquet.avro.avroparquetoutputformat必需:org.apache.hadoop.mapreduce.outputformat[void,string]insetation.scala/flink scala/src/main/scala/com/sc/edl/flink line 75 scala problem
3条答案
按热度按时间4bbkushb1#
扩展另一个答案,您可以通过拖放到java来示例化所需的void类型:
aydmsdu92#
你想创建一个
HadoopOutputFormat[Void, String]
这需要一个OutputFormat[Void, String]
.你提供了一个
AvroParquetOutputFormat
延伸到ParquetOutputFormat<IndexedRecord>
.ParquetOutputFormat
定义为ParquetOutputFormat<T> extends FileOutputFormat<Void, T>
.所以你提供了一个
OutputFormat[Void, IndexedRecord]
而HadoopOutputFormat[Void, String]
期望OutputFormat[Void, String]
.你应该改变
parquetFormat
到如果
DataSet
你想写的不是类型(Void, IndexedRecord)
,您应该添加MapFunction
将数据转换为(Void, IndexedRecord)
对。9wbgstp73#
问题仍然存在,因为到目前为止flink元组不支持空键。将发生以下错误:
Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but expected to hold a value.
更好的选择是使用kitesdk,如本例所述:https://github.com/nezihyigitbasi/flinkparquet 因此,如果您需要动态模式,那么这种方法将不起作用,因为您需要严格遵守模式。此外,这是更好的阅读,而不是写作。spark dataframe不仅在api方面而且在性能方面与parquet配合得非常好。但是,如果你想使用flink,那么你需要等待flink社区发布api或者自己编辑Parquethadoop代码,这可能是一项巨大的工作。
只有这些连接器还没有实现https://github.com/apache/flink/tree/master/flink-connectors 所以,我个人的建议是,如果你能使用spark,那就去吧,考虑到生产用例,它有更成熟的api。当你坚持flink的基本需求时,你可能也会被困在其他地方。
别浪费时间找一个与Flink约现在,我已经浪费了很多我的关键时间,而不是去与标准的选择,如Hive,Spark或先生。