flink不从s3读取子文件夹

vsmadaxz  于 2021-05-29  发布在  Hadoop
关注(0)|答案(3)|浏览(613)

我们使用的是带有建议的s3afilesystem配置的flink 1.2.0。当一个简单的流作业的源是s3 bucket中的一个文件夹时,它就可以正常工作。
当作业的源是一个本身包含子文件夹的文件夹时,该作业运行时不会出错,但不会产生输出。
为清楚起见,下面是s3铲斗的模型。运行作业以指向 s3a://bucket/folder/2017/04/25/01/ 正确读取所有三个对象以及bucket中出现的任何后续对象。将作业指向 s3a://bucket/folder/2017/ (或任何其他中间文件夹)导致运行时不产生任何内容的作业。
在一阵绝望中,我们尝试了包括尾随的排列 / .

.
`-- folder
    `-- 2017
        `-- 04
            |-- 25
            |   |-- 00
            |   |   |-- a.txt
            |   |   `-- b.txt
            |   `-- 01
            |       |-- c.txt
            |       |-- d.txt
            |       `-- e.txt
            `-- 26

职务代码:

def main(args: Array[String]) {

  val parameters = ParameterTool.fromArgs(args)
  val bucket = parameters.get("bucket")
  val folder = parameters.get("folder")

  val path = s"s3a://$bucket/$folder"

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val lines: DataStream[String] = env.readFile(
    inputFormat = new TextInputFormat(new Path(path)),
    filePath = path,
    watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
    interval = Time.seconds(10).toMilliseconds)

  lines.print()
  env.execute("Flink Streaming Scala API Skeleton")
}

core-site.xml是根据以下文档配置的:

<configuration>
  <property>
    <name>fs.s3a.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>
  <property>
    <name>fs.s3a.buffer.dir</name>
    <value>/tmp</value>
  </property>
</configuration>

我们提供了s3afilesystem的所有jar,如下所示:https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#flink-for-hadoop-27版本
我们被难住了。这似乎应该管用;互联网上有大量的面包屑表明这确实有效[例如。,http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/reading-files-from-an-s3-folder-td10281.html]
帮帮我,松鼠们。。。你是我唯一的希望!

krugob8w

krugob8w1#

回答我自己的问题。。。在史蒂夫·拉夫兰的帮助下。
在flink中,使用基于文件的数据源进行连续处理时, FileInputFormat 默认情况下不计算嵌套文件。
不管源代码是s3还是其他任何东西,这都是正确的。
必须这样设置:

def main(args: Array[String]) {

  val parameters = ParameterTool.fromArgs(args)
  val bucket = parameters.get("bucket")
  val folder = parameters.get("folder")

  val path = s"s3a://$bucket/$folder"

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val textInputFormat = new TextInputFormat(new Path(path))

  //this is important!
  textInputFormat.setNestedFileEnumeration(true)

  val lines: DataStream[String] = env.readFile(
    inputFormat = textInputFormat,
    filePath = path,
    watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
    interval = Time.seconds(10).toMilliseconds)

  lines.print()
  env.execute("Flink Streaming Scala API Skeleton")

}

uxhixvfz

uxhixvfz2#

与flink 1.7.x版本一样,flink提供了两个与amazon s3对话的文件系统, flink-s3-fs-presto 以及 flink-s3-fs-hadoop . 两者 flink-s3-fs-hadoop 以及 flink-s3-fs-presto 向注册URI的默认文件系统 Package 器 s3:// 方案, flink-s3-fs-hadoop 还注册 s3a:// 以及 flink-s3-fs-presto 还注册 s3p:// ,因此可以同时使用这两者。
示例代码:

//Reading Data from S3
// This will print all the contents in the bucket line wise
final Path directory = new Path("s3a://husnain28may2020/");
final FileSystem fs = directory.getFileSystem();

//using input format
org.apache.flink.api.java.io.TextInputFormat textInputFormatS3 = new org.apache.flink.api.java.io.TextInputFormat(directory);
DataSet<String> linesS3 = env.createInput(textInputFormatS3);
linesS3.print();
vwhgwdsa

vwhgwdsa3#

下面是什么版本的hadoop?
如果这在hadoop2.8中停止了,可能是一种回归,也就是说可能是我的错。首先在flink下归档一个jira@issues.apache.org,然后,如果它在2.8.0中是新的,则将其链接为被hadoop-13208破坏
这里的代码片段是一个很好的例子,可以用于回归测试,现在是我为flink做一些测试的时候了。
那么大 listFiles() change将路径下的文件枚举从递归treewalk移动到路径下所有子条目的一系列平面列表:它对其他所有内容(distcp、tests、hive、spark)都非常有效,并从12月16日开始在产品中销售;如果这是原因的话,我会有点惊讶,但我不否认这是罪魁祸首。对不起的

相关问题