所以基本上我使用pyspark(jdbc格式)读取数据库中的表,然后将数据写入azure数据湖。我编写的代码可以正常工作,除了非常大的表(40万行,50列)外,还有以下错误:
Py4JJavaError: An error occurred while calling o94.parquet.
: org.apache.spark.SparkException: Job aborted.
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 2.0 failed 1 times, most recent failure: Lost task 23.0 in stage 2.0 (TID 25, 2612a419099c, executor driver): com.microsoft.sqlserver.jdbc.SQLServerException: SQL Server returned an incomplete response. The connection has been closed.
我认为这个问题是由于没有足够的内存分配,所以我增加了执行程序和驱动程序的内存到10g。然而,问题依然存在。这是我的密码:
spkConfig = SparkConf() \
.setAppName(appName) \
.setMaster(master) \
.set(f"fs.azure.account.auth.type.{azStorageAccount}.dfs.core.windows.net", "SharedKey") \
.set(f"fs.azure.account.key.{azStorageAccount}.dfs.core.windows.net", azStorageKey) \
.set("spark.executor.memory", "10g") \
.set("spark.driver.memory", "10g") \
.set("spark.cores.max", "5")
spkContext = SparkContext(conf=spkConfig)
sqlContext = SQLContext(spkContext)
spark = sqlContext.sparkSession
##
# Read table from DB to Dataframe partitioned on ID
##
def read_data(tableName, partitionCol, dbQuery, partitionSize, partitionUpperBound):
jdbcDF = spark.read.format("jdbc") \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.option("url", f"jdbc:sqlserver://{dbHost}:{dbPort};databaseName={dbDatabase}") \
.option("dbtable", dbQuery) \
.option("user", dbUsername) \
.option("password", dbPassword) \
.option("queryTimeout", 10) \
.option("numPartitions", partitionSize) \
.option("partitionColumn", partitionCol) \
.option("lowerBound", 1) \
.option("upperBound", partitionUpperBound) \
.load()
return jdbcDF
##
# Write Dataframe as automatically partitioned parquet files for each month
##
def write_data(tableName, tableDF, partitionCol):
tableDF \
.withColumn("year", year(partitionCol)) \
.withColumn("month", month(partitionCol)) \
.write.mode('overwrite')\
.partitionBy('year', 'month') \
.parquet(f"abfss://{azStorageContainer}@{azStorageAccount}.dfs.core.windows.net/" + tableName + ".parquet")
暂无答案!
目前还没有任何答案,快来回答吧!