我需要实现转换csv.gz文件在一个文件夹中,在awss3和hdfs,以Parquet文件使用Spark(scala首选)。数据的一列是时间戳,我只有一周的数据集。时间戳格式为:
'年-月-日hh:mm:ss'
我想要的结果是,对于每一天,都有一个文件夹(或分区),在那里特定日期的Parquet文件位于其中。因此将有7个输出文件夹或分区。
我只有一个模糊的想法如何做到这一点,只有sc.textfile是在我的脑海中。spark中是否有一个函数可以转换为parquet?如何在s3和hdfs中实现这一点?
谢谢你的帮助。
3条答案
按热度按时间brtdzjyr1#
老主题,但我认为即使是老主题,如果回答不正确也很重要。
在spark版本>=2中,在您需要将databricks csv包导入作业之前,已经包含csv包,例如“--packages com.d”atabricks:spark-csv_2.10:1.5.0".
csv示例:
首先,您需要创建配置单元表,以便spark写入的数据与配置单元模式兼容(在将来的版本中可能不再需要此功能)
创建表:
完成后,您可以轻松地读取csv并将Dataframe保存到该表中。第二步将用类似“yyyy-mm-dd”的日期格式覆盖列日期。对于每个值,将创建一个文件夹,其中包含特定的行。
scala spark外壳示例:
前两行是配置单元配置,创建一个尚不存在的分区文件夹需要配置单元配置。
插入完成后,您可以直接查询表,如“select*from part\u parq\u table”。文件夹将在默认cloudera上的tablefolder中创建,例如hdfs:///users/hive/warehouse/part\parq\u table
希望对你有帮助
4xrmg8kj2#
如果您查看spark dataframe api和spark csv包,这将实现您要做的大部分工作—将csv文件读入一个dataframe,然后将dataframe作为parquet写出来,这将为您提供大部分方法。
您仍然需要对时间戳进行解析并使用结果对数据进行分区。
c3frrgcw3#
通过第二个tsv读取csv文件/user/hduser/wikipedia/pageviews
下面的代码使用spark2.0
将字符串时间戳转换为时间戳
写入Parquet文件。