pyspark编写jdbc超时

c7rzv4ha  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(336)

所以基本上我使用pyspark(jdbc格式)读取数据库中的表,然后将数据写入azure数据湖。我编写的代码可以正常工作,除了非常大的表(40万行,50列)外,还有以下错误:

Py4JJavaError: An error occurred while calling o94.parquet.
: org.apache.spark.SparkException: Job aborted.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 2.0 failed 1 times, most recent failure: Lost task 23.0 in stage 2.0 (TID 25, 2612a419099c, executor driver): com.microsoft.sqlserver.jdbc.SQLServerException: SQL Server returned an incomplete response. The connection has been closed.

我认为这个问题是由于没有足够的内存分配,所以我增加了执行程序和驱动程序的内存到10g。然而,问题依然存在。这是我的密码:

spkConfig = SparkConf() \
    .setAppName(appName) \
    .setMaster(master) \
    .set(f"fs.azure.account.auth.type.{azStorageAccount}.dfs.core.windows.net", "SharedKey") \
    .set(f"fs.azure.account.key.{azStorageAccount}.dfs.core.windows.net", azStorageKey) \
    .set("spark.executor.memory", "10g") \
    .set("spark.driver.memory", "10g") \
    .set("spark.cores.max", "5")
spkContext = SparkContext(conf=spkConfig)
sqlContext = SQLContext(spkContext)
spark = sqlContext.sparkSession

## 

# Read table from DB to Dataframe partitioned on ID

## 

def read_data(tableName, partitionCol, dbQuery, partitionSize, partitionUpperBound):  
    jdbcDF = spark.read.format("jdbc") \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("url", f"jdbc:sqlserver://{dbHost}:{dbPort};databaseName={dbDatabase}") \
    .option("dbtable", dbQuery) \
    .option("user", dbUsername) \
    .option("password", dbPassword) \
    .option("queryTimeout", 10) \
    .option("numPartitions", partitionSize) \
    .option("partitionColumn", partitionCol) \
    .option("lowerBound", 1) \
    .option("upperBound", partitionUpperBound) \
    .load()

    return jdbcDF

## 

# Write Dataframe as automatically partitioned parquet files for each month

## 

def write_data(tableName, tableDF, partitionCol):
    tableDF \
    .withColumn("year", year(partitionCol)) \
    .withColumn("month", month(partitionCol)) \
    .write.mode('overwrite')\
    .partitionBy('year', 'month') \
    .parquet(f"abfss://{azStorageContainer}@{azStorageAccount}.dfs.core.windows.net/" + tableName + ".parquet")

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题