spark-读写表时保持分区

bq8i3lrv  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(443)

我使用spark(尤其是pyspark)并有以下场景:
有两张table(a和b)
a由c列和d列分隔。
我想从a读取所有数据,做一个微小的转换(例如添加一个静态列),然后将结果保存到b。
b应该和a有完全相同的分区。
洗牌需要避免,因为我们有30亿行
所以从逻辑的Angular 来看,这应该是很容易做到这一点没有任何洗牌。
但是怎么做呢?
版本1

df = spark.read.parquet("pathToA")
df = df.withColumn("x", f.lit("x"))
df.write.format("parquet").save("pathToB")

在这种情况下,表b根本没有分区。
版本2

df = spark.read.parquet("pathToA")
df = df.withColumn("x", f.lit("x"))
df.write.format("parquet").partitionBy("c", "d").save("pathToB")

在这种情况下,有很多洗牌,这需要永远。
版本3

df = spark.read.parquet("pathToA")
df = df.withColumn("x", f.lit("x"))
df = df.repartition("c", "d")
df.write.format("parquet").partitionBy("c", "d").save("pathToB")

与版本2相同->大量的洗牌,永远不会结束。
如果有人有一个想法,如何存档这个不洗牌,这将是非常有帮助的!提前多谢!
祝你好运,本杰明

d4so4syb

d4so4syb1#

事实上,第2版是正确的方法:

df = spark.read.parquet("pathToA")
df = df.withColumn("x", f.lit("x"))
df.write.format("parquet").partitionBy("c", "d").save("pathToB")

你可以做的是,如果版本1根本没有洗牌,你可以读入c,d的每一个排列,它与版本1是一样的。但我怀疑这会更快:

permutations = [(c,d) for (c,d) in df.dropDuplicates(['c','d']).collect()]

for (c, d) in permutations:
  df = spark.read.parquet("pathToA").filter(f'c = "{c}" AND d = "{d}"')
  df = df.withColumn("x", f.lit("x"))
  df.write.format("parquet").save(f'pathToB/c={c}/d={d}')

这是一种变通方法,我认为如果你有一个Hive表上面的文件夹,你需要使用 msck repair table 刷新它。

相关问题