在ApachFlink中为每个处理过的输入文件生成一个输出文件

w46czmvw  于 2021-06-21  发布在  Flink
关注(0)|答案(1)|浏览(449)

我正在使用scala和apacheflink构建一个etl,它定期读取本地文件系统中某个目录下的所有文件,并将处理每个文件的结果写入另一个目录下的单个输出文件中。
例如:

/dir/to/input/files/file1
/dir/to/intput/files/fil2
/dir/to/input/files/file3

etl的输出正好是:

/dir/to/output/files/file1
/dir/to/output/files/file2
/dir/to/output/files/file3

我尝试过各种方法,包括在写入datasink时将并行处理减少到一个,但仍然无法达到所需的结果。
这是我当前的代码:

val path = "/path/to/input/files/"
   val format = new TextInputFormat(new Path(path))
   val socketStream = env.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 10)

   val wordsStream = socketStream.flatMap(value => value.split(",")).map(value => WordWithCount(value,1))

   val keyValuePair = wordsStream.keyBy(_.word)

   val countPair = keyValuePair.sum("count")

   countPair.print()

   countPair.writeAsText("/path/to/output/directory/"+
     DateTime.now().getHourOfDay.toString
     +
     DateTime.now().getMinuteOfHour.toString
     +
     DateTime.now().getSecondOfMinute.toString
     , FileSystem.WriteMode.NO_OVERWRITE)

// The first write method I trid:

   val sink = new BucketingSink[WordWithCount]("/path/to/output/directory/")
   sink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm"))

// The second write method I trid:

   val sink3 = new BucketingSink[WordWithCount]("/path/to/output/directory/")
   sink3.setUseTruncate(false)
   sink3.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"))
   sink3.setWriter(new StringWriter[WordWithCount])
   sink3.setBatchSize(3)
   sink3.setPendingPrefix("file-")
   sink3.setPendingSuffix(".txt")

两种书写方法都不能产生想要的结果。
有apache flink经验的人能指导我使用write方法吗。

brccelvz

brccelvz1#

我通过导入要在本地计算机上运行的下一个依赖项解决了此问题:
hadoop-aws-2.7.3.jar
aws-java-sdk-s3-1.11.183.jar文件
aws-java-sdk-core-1.11.183.jar文件
aws-java-sdk-kms-1.11.183.jar文件
jackson-annotations-2.6.7.jar文件
jackson-core-2.6.7.jar文件
jackson-databind-2.6.7.jar文件
joda-time-2.8.1.jar文件
httpcore-4.4.4.jar
httpclient-4.5.3.jar
您可以在以下位置查看:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/aws.html
“提供s3文件系统依赖关系”部分

相关问题