如何使用spark local在ADLS Gen2中创建增量表而不使用Databricks

u7up0aaq  于 2023-08-06  发布在  Apache
关注(0)|答案(1)|浏览(121)

我正在尝试从ADLS gen 2(Microsoft Azure)读取csv文件并创建增量表。我能够成功地启动一个sparksession,并在我的本地通过spark读取csv文件。但是,当我试图将dataframe写为delta表时,我得到了错误消息Py 4JJavaError:调用o120.save时出错。:org.apache.spark.sql.delta.DeltaIllegalStateException:版本(Vector(0,0))不连续。
我已经使用了不同版本的spark、python和其他相关jar的所有组合,但我仍然无法从本地创建Delta表。
这些是我用来设置spark并从ADLS gen 2 Spark-3.3.0 Python-3.10 delta-spark-2.3 azure-storage-8.6.6 hadoop-azure-3.3.1 hadoop-azure-datalake-3.3.3 hadoop-common-3.3.3读取数据的配置

这是我试过的代码

from pyspark.sql import SparkSession
        from delta.tables import *
        spark =SparkSession.builder.master("local[*]")
        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
        .config("spark.driver.memory", "4g")
        .config("spark.executor.memory", "4g")
        .config("spark.jars", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\jars\mysql-connector-j-8.0.31.jar")
        .config("spark.jars", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\jars\azure-storage-8.6.6.jar") 
        .config("spark.jars", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\jars\hadoop-azure-3.3.1.jar")
        .config("spark.jars", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\jars\hadoop-azure-datalake-3.3.3.jar")  
        .config("spark.jars", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\jars\hadoop-common-3.3.3.jar")    
        .config("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED")
        .config("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")
        .config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED")
        .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
        .config("spark.sql.execution.arrow.pyspark.enabled", "TRUE")
        .config("spark.sql.legacy.timeParserPolicy", "CORRECTED")
        .config("spark.sql.warehouse.dir", r"C:\Users\Proem Sports\Documents\Jupyter notebooks\Dev_scripts\metastore_db")
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")
        .config("spark.delta.commitInfo.merge.enabled", "true")
        .config("fs.azure.createRemoteFileSystemDuringInitialization", "false")
        .config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
        .config("fs.azure.account.auth.type", "SharedKey")
        .config("fs.azure.account.key.proemdatalake.blob.core.windows.net", "account-key")
        .enableHiveSupport()
        .getOrCreate()
        spark.conf.set("fs.azure.account.auth.type."+storage_account_name+".dfs.core.windows.net", "OAuth")
        spark.conf.set("fs.azure.account.oauth.provider.type."+storage_account_name+".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
        spark.conf.set("fs.azure.account.oauth2.client.id."+storage_account_name+".dfs.core.windows.net", client_id)
        spark.conf.set("fs.azure.account.oauth2.client.secret."+storage_account_name+".dfs.core.windows.net", client_secret)
        spark.conf.set("fs.azure.account.oauth2.client.endpoint."+storage_account_name+".dfs.core.windows.net", "https://login.microsoftonline.com/"+directory_id+"/oauth2/token")

        df=spark.read.format("csv").load("abfss://"container_name"@"storage_account_name".dfs.core.windows.net/RAW_DATA/MERHCHANDISE/MERCH_20230424_.csv",header=True,inferSchema=True)

df.write.format("delta").option("overwriteSchema", "true").option('delta.columnMapping.mode', 'name').save("abfss://"container_name"@"storage_account_name".dfs.core.windows.net/tables/delta_table1")

字符串

我得到的错误是:{Py4JJavaError: An error occurred while calling o120.save. : org.apache.spark.sql.delta.DeltaIllegalStateException: Versions (Vector(0, 0)) are not contiguous. at org.apache.spark.sql.delta.DeltaErrorsBase.deltaVersionsNotContiguousException(DeltaErrors.scala:852) at org.apache.spark.sql.delta.DeltaErrorsBase.deltaVersionsNotContiguousException$(DeltaErrors.scala:850) at org.apache.spark.sql.delta.DeltaErrors$.deltaVersionsNotContiguousException(DeltaErrors.scala:2293)}

eit6fx6z

eit6fx6z1#

在配置时需要使用AzureLogstore

spark = SparkSession.builder.master("local[*]") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.AzureLogStore") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED") \
    .config("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED") \
    .config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED") \
    .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "TRUE") \
    .config("spark.sql.legacy.timeParserPolicy", "CORRECTED") \
    .config("spark.sql.warehouse.dir", r"C:\Users\v-jgs\Desktop\warehouse") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.delta.commitInfo.merge.enabled", "true") \
    .config("fs.azure.createRemoteFileSystemDuringInitialization", "false") \
    .config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
    .config("fs.azure.account.auth.type", "SharedKey") \
    .config("fs.azure.account.key.<account_name>.dfs.core.windows.net", "account_key") \
    .enableHiveSupport() \
    .getOrCreate()

字符串
x1c 0d1x的数据
输出量:



和/或



将其更改为Azure日志存储。

.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.AzureLogStore")

相关问题