我正在使用pythonapi和pyspark执行delta-lake合并操作。在执行合并操作后,我调用压缩操作,但压缩会产生以下错误:
错误:
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 170, in load
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1248, in __call__
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1212, in _build_args
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1199, in _get_args
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_collections.py", line 501, in convert
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1248, in __call__
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1218, in _build_args
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1218, in <listcomp>
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 298, in get_command_part
AttributeError: 'DeltaTable' object has no attribute '_get_object_id'
代码
delta_table = "delta_lake_path"
df = spark.read.csv("s3n://input_file.csv",header=True)
delta_table = DeltaTable.forPath(spark, delta_table)
delta_table.merge(df, "df.id = delta_table.id" ).whenNotMatchedInsertAll().execute()
# compaction
spark.read.format("delta").load(delta_table).repartition(1).write.option("dataChange",
"False").format("delta").mode("overwrite").save(delta_table)
有人能告诉我为什么spark会话不能创建另一个delta表示例吗。我需要在同一个脚本中执行合并和压缩,因为我只想在执行合并的分区上运行压缩。分区是从从input_file.csv创建的Dataframedf中存在的唯一值派生出来的
1条答案
按热度按时间ldioqlga1#
我认为你的问题在于
delta_table
变量-一开始它是一个包含delta-lake路径的字符串,但随后您正在创建一个delta-table对象,试图将其传递到.load()
方法。分离这些变量有助于: