我正在尝试从ADLS gen 2(Microsoft Azure)读取csv文件并创建增量表。我能够成功地启动一个sparksession,并在我的本地通过spark读取csv文件。但是,当我试图将dataframe写为delta表时,我得到了错误消息Py 4JJavaError:调用o120.save时出错。:org.apache.spark.sql.delta.DeltaIllegalStateException:版本(Vector(0,0))不连续。
我已经使用了不同版本的spark、python和其他相关jar的所有组合,但我仍然无法从本地创建Delta表。
这些是我用来设置spark并从ADLS gen 2 Spark-3.3.0 Python-3.10 delta-spark-2.3 azure-storage-8.6.6 hadoop-azure-3.3.1 hadoop-azure-datalake-3.3.3 hadoop-common-3.3.3读取数据的配置
这是我试过的代码
from pyspark.sql import SparkSession
from delta.tables import *
spark =SparkSession.builder.master("local[*]")
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "4g")
.config("spark.jars", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\jars\mysql-connector-j-8.0.31.jar")
.config("spark.jars", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\jars\azure-storage-8.6.6.jar")
.config("spark.jars", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\jars\hadoop-azure-3.3.1.jar")
.config("spark.jars", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\jars\hadoop-azure-datalake-3.3.3.jar")
.config("spark.jars", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\jars\hadoop-common-3.3.3.jar")
.config("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED")
.config("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")
.config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED")
.config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
.config("spark.sql.execution.arrow.pyspark.enabled", "TRUE")
.config("spark.sql.legacy.timeParserPolicy", "CORRECTED")
.config("spark.sql.warehouse.dir", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\Dev_scripts\metastore_db")
.config("spark.sql.execution.arrow.pyspark.enabled", "true")
.config("spark.delta.commitInfo.merge.enabled", "true")
.config("fs.azure.createRemoteFileSystemDuringInitialization", "false")
.config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
.config("fs.azure.account.auth.type", "SharedKey")
.config("fs.azure.account.key.proemdatalake.blob.core.windows.net", "account-key")
.enableHiveSupport()
.getOrCreate()
spark.conf.set("fs.azure.account.auth.type."+storage_account_name+".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type."+storage_account_name+".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id."+storage_account_name+".dfs.core.windows.net", client_id)
spark.conf.set("fs.azure.account.oauth2.client.secret."+storage_account_name+".dfs.core.windows.net", client_secret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint."+storage_account_name+".dfs.core.windows.net", "https://login.microsoftonline.com/"+directory_id+"/oauth2/token")
df=spark.read.format("csv").load("abfss://"container_name"@"storage_account_name".dfs.core.windows.net/RAW_DATA/MERHCHANDISE/MERCH_20230424_.csv",header=True,inferSchema=True)
df.write.format("delta").option("overwriteSchema", "true").option('delta.columnMapping.mode', 'name').save("abfss://"container_name"@"storage_account_name".dfs.core.windows.net/tables/delta_table1")
字符串
我得到的错误是:{Py4JJavaError: An error occurred while calling o120.save. : org.apache.spark.sql.delta.DeltaIllegalStateException: Versions (Vector(0, 0)) are not contiguous. at org.apache.spark.sql.delta.DeltaErrorsBase.deltaVersionsNotContiguousException(DeltaErrors.scala:852) at org.apache.spark.sql.delta.DeltaErrorsBase.deltaVersionsNotContiguousException$(DeltaErrors.scala:850) at org.apache.spark.sql.delta.DeltaErrors$.deltaVersionsNotContiguousException(DeltaErrors.scala:2293)}
1条答案
按热度按时间eit6fx6z1#
在配置时需要使用
AzureLogstore
。字符串
x1c 0d1x的数据
输出量:
的
和/或
的
将其更改为Azure日志存储。
型