我有一个关于hdfs的Parquet文件。它每天都被一个新的覆盖。我的目标是使用datastream api在flink作业中以数据流的形式连续地发出这个Parquet文件(当它发生变化时)。最终目标是在广播状态下使用文件内容,但这超出了这个问题的范围。
要连续处理文件,有一个非常有用的api:关于数据源的数据源。更具体地说,fileprocessingmode.process\u:这正是我需要的。这适用于读取/监视文本文件,没问题,但不适用于Parquet文件:
// Partial version 1: the raw file is processed continuously
val path: String = "hdfs://hostname/path_to_file_dir/"
val textInputFormat: TextInputFormat = new TextInputFormat(new Path(path))
// monitor the file continuously every minute
val stream: DataStream[String] = streamExecutionEnvironment.readFile(textInputFormat, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 60000)
为了处理Parquet文件,我可以使用hadoop输入格式,使用这个api:usinghadoopinputformats。但是,此api没有fileprocessingmode参数,并且仅处理文件一次:
// Partial version 2: the parquet file is only processed once
val parquetPath: String = "/path_to_file_dir/parquet_0000"
// raw text format
val hadoopInputFormat: HadoopInputFormat[Void, ArrayWritable] = HadoopInputs.readHadoopFile(new MapredParquetInputFormat(), classOf[Void], classOf[ArrayWritable], parquetPath)
val stream: DataStream[(Void, ArrayWritable)] = streamExecutionEnvironment.createInput(hadoopInputFormat).map { record =>
// process the record here ...
}
我想以某种方式结合这两个api,通过datastreamapi连续处理Parquet文件。你们有人试过这样的吗?
1条答案
按热度按时间dxpyg8gm1#
在浏览完flink的代码之后,看起来这两个api是相对不同的,似乎不可能将它们合并在一起。
另一种方法(我将在这里详述)是定义自己的sourcefunction,它将定期读取文件:
然后,使用streamexecutionenvironment注册此源: