在我的场景中,csv文件不断上传到hdfs。
一旦上传了一个新文件,我想用sparksql处理这个新文件(例如,计算文件中某个字段的最大值,将文件转换为 parquet
). i、 在每个输入文件和转换/处理的输出文件之间有一对一的Map。
我评估spark streaming来监听hdfs目录,然后用spark处理“流文件”。
但是,为了处理整个文件,我需要知道“文件流”何时完成。我想将转换应用于整个文件,以便保留文件之间的端到端一对一Map。
如何转换整个文件而不是其微批处理?
据我所知,spark流只能将转换应用于批处理( DStreams
Map到 RDDs
)而不是一次访问整个文件(当有限流完成时)。
对吗?如果是这样,我应该为我的场景考虑什么替代方案?
2条答案
按热度按时间nwo49xxi1#
您可以使用dfsinotifyeventinputstream监视hadoop目录,然后在创建文件时以编程方式执行spark作业。
请看这篇文章:hdfs文件观察者
biswetbf2#
我可能第一次就误解了你的问题。。。
据我所知,spark流只能将转换应用于批处理(Map到RDD的数据流),而不能一次应用于整个文件(当有限流完成时)。
对吗?
不,那不对。
spark streaming将在spark streaming的批处理间隔结束时立即将转换应用于整个文件。
spark streaming将获取文件的当前内容并开始处理它。
一旦上传了一个新文件,我就需要用spark/sparksql处理这个新文件
spark几乎不可能,因为它的体系结构从“上传”到spark处理它需要一些时间。
您应该考虑使用全新的、闪亮的结构化流媒体或(即将过时的)spark流媒体。
这两种解决方案都支持监视新文件的目录,并在上载新文件后触发spark作业(这正是您的用例)。
引用结构化流媒体的输入源:
在spark 2.0中,有一些内置的源代码。
文件源-读取作为数据流写入目录中的文件。支持的文件格式有text、csv、json、parquet。请参阅datastreamreader接口的文档以获取更为最新的列表,以及每种文件格式支持的选项。请注意,文件必须以原子方式放置在给定的目录中,在大多数文件系统中,这可以通过文件移动操作实现。
另请参见spark streaming的基本来源:
除了套接字之外,streamingcontext api还提供了从作为输入源的文件创建数据流的方法。
文件流:为了从与hdfs api兼容的任何文件系统(即hdfs、s3、nfs等)上的文件读取数据,可以创建如下数据流:
spark streaming将监视目录datadirectory并处理在该目录中创建的任何文件(不支持在嵌套目录中写入的文件)。
但有一点需要注意:
我需要知道“文件流”何时完成。
不要用Spark做这个。
再次引用spark streaming的基本来源:
文件必须在datadirectory中通过原子方式移动或重命名到data目录中来创建。
一旦移动,文件就不能更改。因此,如果连续追加文件,则不会读取新数据。
结束…您应该只将文件移动到spark监视的目录中,此时文件已完成并准备好使用spark进行处理。这超出了spark的范围。