我们使用的是带有建议的s3afilesystem配置的flink 1.2.0。当一个简单的流作业的源是s3 bucket中的一个文件夹时,它就可以正常工作。
当作业的源是一个本身包含子文件夹的文件夹时,该作业运行时不会出错,但不会产生输出。
为清楚起见,下面是s3铲斗的模型。运行作业以指向 s3a://bucket/folder/2017/04/25/01/
正确读取所有三个对象以及bucket中出现的任何后续对象。将作业指向 s3a://bucket/folder/2017/
(或任何其他中间文件夹)导致运行时不产生任何内容的作业。
在一阵绝望中,我们尝试了包括尾随的排列 /
.
.
`-- folder
`-- 2017
`-- 04
|-- 25
| |-- 00
| | |-- a.txt
| | `-- b.txt
| `-- 01
| |-- c.txt
| |-- d.txt
| `-- e.txt
`-- 26
职务代码:
def main(args: Array[String]) {
val parameters = ParameterTool.fromArgs(args)
val bucket = parameters.get("bucket")
val folder = parameters.get("folder")
val path = s"s3a://$bucket/$folder"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lines: DataStream[String] = env.readFile(
inputFormat = new TextInputFormat(new Path(path)),
filePath = path,
watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
interval = Time.seconds(10).toMilliseconds)
lines.print()
env.execute("Flink Streaming Scala API Skeleton")
}
core-site.xml是根据以下文档配置的:
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>
我们提供了s3afilesystem的所有jar,如下所示:https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#flink-for-hadoop-27版本
我们被难住了。这似乎应该管用;互联网上有大量的面包屑表明这确实有效[例如。,http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/reading-files-from-an-s3-folder-td10281.html]
帮帮我,松鼠们。。。你是我唯一的希望!
3条答案
按热度按时间krugob8w1#
回答我自己的问题。。。在史蒂夫·拉夫兰的帮助下。
在flink中,使用基于文件的数据源进行连续处理时,
FileInputFormat
默认情况下不计算嵌套文件。不管源代码是s3还是其他任何东西,这都是正确的。
必须这样设置:
}
uxhixvfz2#
与flink 1.7.x版本一样,flink提供了两个与amazon s3对话的文件系统,
flink-s3-fs-presto
以及flink-s3-fs-hadoop
. 两者flink-s3-fs-hadoop
以及flink-s3-fs-presto
向注册URI的默认文件系统 Package 器s3://
方案,flink-s3-fs-hadoop
还注册s3a://
以及flink-s3-fs-presto
还注册s3p://
,因此可以同时使用这两者。示例代码:
vwhgwdsa3#
下面是什么版本的hadoop?
如果这在hadoop2.8中停止了,可能是一种回归,也就是说可能是我的错。首先在flink下归档一个jira@issues.apache.org,然后,如果它在2.8.0中是新的,则将其链接为被hadoop-13208破坏
这里的代码片段是一个很好的例子,可以用于回归测试,现在是我为flink做一些测试的时候了。
那么大
listFiles()
change将路径下的文件枚举从递归treewalk移动到路径下所有子条目的一系列平面列表:它对其他所有内容(distcp、tests、hive、spark)都非常有效,并从12月16日开始在产品中销售;如果这是原因的话,我会有点惊讶,但我不否认这是罪魁祸首。对不起的