我正在azure synapse中创建一个数据管道。
基本流程:
抓取一些837EDI数据的csv文件。把这些数据文件放到azure数据湖(gen2)上。foreach文件将数据转换为spark数据库中的表格格式,命名为claims。查看我的流程
我的流动
我的问题是:运行时间太长。似乎每个文件都必须创建一个新的spark会话,开销太大(每个3分钟)。我想通过appname“声明”一个会话并在整个过程中使用它。我有3个测试文件,其中一个有10行,另一个有2行,第三个有10行。22排12分钟的总时间。
在我的流程中,您可以看到foreach循环每个都有2个活动(一个是笔记本,另一个是存储过程),这取决于它是837i还是837p。
我的笔记本代码:
'''python
import re
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
# create Spark session with necessary configuration
spark = (SparkSession
.builder
.appName("837App")
.config("spark.network.timeout", "600s")
.config("spark.executor.heartbeatInterval", "10s")
.getOrCreate());
# prepping the variables for the source FileName
srcPath = "abfss://folder@server.dfs.core.windows.net";
srcFQFN = f"{srcPath}/{srcFilesDirectory}/{srcFileName}";
dstTableName = "raw_837i";
# read Flat file into a data frame
df = spark.read.load(f"{srcFQFN}",
format = 'csv',
delimiter = f"{srcFileDelimeter}",
header = True
);
# add autoid
adf = df.withColumn('AutoID', row_number().over(Window.orderBy(monotonically_increasing_id())));
# clean up column names
adf = adf.toDF(*(re.sub(r"[\.\s\(\)\-]+", "_", c) for c in adf.columns));
# now the Spark database side...
# create the destination database if it did not exist
spark.sql(f"CREATE DATABASE IF NOT EXISTS {sparkDbName}");
# write that dataframe to a Spark table. update mode from overwrite to append if we just want to insert
adf.write.mode("overwrite").saveAsTable(f"{sparkDbName}.{dstTableName}");
谢谢@sequinex和@bendemann
我试过的:
在管道的开头添加了一个笔记本来设置会话;请参阅我的流程中的set 837 env。其目的是,如果不存在具有该appname的会话,它将创建该会话,然后稍后使用它。这样我就把3分钟的启动时间花在了管道的前端,而不是每个文件上。
'''python
from pyspark.sql import SparkSession
# create Spark session with necessary configuration
spark = (SparkSession
.builder
.appName("837App")
.config("spark.network.timeout", "600s")
.config("spark.executor.heartbeatInterval", "10s")
.getOrCreate());
sc = spark.sparkContext;
我无法证明它确实在使用这个appname(所以如果有人也能帮忙的话)。
我试过:
'''python
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("837App").getOrCreate()
sc
结果:
<SparkContext master=yarn appName=Synapse_sparkPrimary_1618935780>'
appname=837app不应该吗?
我还尝试停止现有会话并启动我的会话
'''python
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
sc.stop()
spark = SparkSession.builder.appName("837App").getOrCreate()
sc
但我有以下错误:
Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Promise already completed.
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
at scala.concurrent.Promise$class.success(Promise.scala:86)
at scala.concurrent.impl.Promise$DefaultPromise.success(Promise.scala:157)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$sparkContextInitialized(ApplicationMaster.scala:392)
at org.apache.spark.deploy.yarn.ApplicationMaster$.sparkContextInitialized(ApplicationMaster.scala:808)
at org.apache.spark.scheduler.cluster.YarnClusterScheduler.postStartHook(YarnClusterScheduler.scala:32)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:566)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Traceback (most recent call last):
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 173, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
conf, jsc, profiler_cls)
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 198, in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 306, in _initialize_context
return self._jvm.JavaSparkContext(jconf)
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__
answer, self._gateway_client, None, self._fqn)
File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
return f(*a,**kw)
File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Promise already completed.
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
at scala.concurrent.Promise$class.success(Promise.scala:86)
at scala.concurrent.impl.Promise$DefaultPromise.success(Promise.scala:157)
at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$sparkContextInitialized(ApplicationMaster.scala:392)
at org.apache.spark.deploy.yarn.ApplicationMaster$.sparkContextInitialized(ApplicationMaster.scala:808)
at org.apache.spark.scheduler.cluster.YarnClusterScheduler.postStartHook(YarnClusterScheduler.scala:32)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:566)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
短暂性脑缺血发作
暂无答案!
目前还没有任何答案,快来回答吧!