我正在编写一个Spark作业,从json文件中读取数据并将其写入parquet文件,下面是示例代码:
DataFrame dataFrame = new DataFrameReader(sqlContext).json(textFile);
dataFrame = dataFrame.withColumn("year", year(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
dataFrame = dataFrame.withColumn("month", month(to_date(unix_timestamp(dataFrame.col("date"), "YYYY-MM-dd'T'hh:mm:ss.SSS").cast("timestamp"))));
dataFrame.write().mode(SaveMode.Append).partitionBy("year", "month").parquet("<some_path>");
Json文件由大量json记录组成,我希望记录在parquet中更新,如果它已经存在。我已经尝试了Append
模式,但它似乎是在文件级别而不是记录级别工作(即如果文件已经存在,它会在最后写入)。因此,为同一文件运行此作业会复制记录。
有没有什么方法可以指定dataframe行id作为唯一键,并要求spark更新已经存在的记录?所有的保存模式似乎都是检查文件而不是记录。
3条答案
按热度按时间xjreopfe1#
Parquet是一种文件格式而不是数据库,为了实现按id更新,你将需要读取文件,更新内存中的值,而不是将数据重新写入新文件(或覆盖现有文件)。
如果这是一个经常发生的用例,那么使用数据库可能会更好。
ao218c7q2#
你可以看看Apache ORC文件格式,看看:
https://orc.apache.org/docs/acid.html
根据您的用例,或者HBase,如果您想保持HDFS的领先地位。
但请记住,HDFS是一个只写一次的文件系统,如果这不适合你的需要,可以使用其他文件系统(比如elasticsearch,mongodb)。
否则,在HDFS中,每次都必须创建新文件,必须设置一个增量过程来构建“delta”文件,然后合并OLD + DELTA = NEW_DATA。
oxcyiej73#
你也可以看看apache hudi(https://hudi.apache.org/),它提供了对parquet文件更新的支持。