在azure synapse analytics中使用pyspark如何创建多个笔记本可以使用的会话

ttcibm8c  于 2021-07-14  发布在  Spark
关注(0)|答案(0)|浏览(449)

我正在azure synapse中创建一个数据管道。
基本流程:
抓取一些837EDI数据的csv文件。把这些数据文件放到azure数据湖(gen2)上。foreach文件将数据转换为spark数据库中的表格格式,命名为claims。查看我的流程
我的流动
我的问题是:运行时间太长。似乎每个文件都必须创建一个新的spark会话,开销太大(每个3分钟)。我想通过appname“声明”一个会话并在整个过程中使用它。我有3个测试文件,其中一个有10行,另一个有2行,第三个有10行。22排12分钟的总时间。
在我的流程中,您可以看到foreach循环每个都有2个活动(一个是笔记本,另一个是存储过程),这取决于它是837i还是837p。
我的笔记本代码:

'''python
import re
from pyspark.sql.functions import desc, row_number, monotonically_increasing_id
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

# create Spark session with necessary configuration

spark = (SparkSession
    .builder
    .appName("837App")
    .config("spark.network.timeout", "600s")
    .config("spark.executor.heartbeatInterval", "10s")
    .getOrCreate());

# prepping the variables for the source FileName

srcPath = "abfss://folder@server.dfs.core.windows.net";
srcFQFN = f"{srcPath}/{srcFilesDirectory}/{srcFileName}";   
dstTableName = "raw_837i";

# read Flat file into a data frame

df = spark.read.load(f"{srcFQFN}", 
    format = 'csv',
    delimiter = f"{srcFileDelimeter}",
    header = True
);

# add autoid

adf = df.withColumn('AutoID', row_number().over(Window.orderBy(monotonically_increasing_id())));

# clean  up column names

adf = adf.toDF(*(re.sub(r"[\.\s\(\)\-]+", "_", c) for c in adf.columns));

# now the Spark database side...

# create the destination database if it did not exist

spark.sql(f"CREATE DATABASE IF NOT EXISTS {sparkDbName}");

# write that dataframe to a Spark table. update mode from overwrite to append if we just want to insert

adf.write.mode("overwrite").saveAsTable(f"{sparkDbName}.{dstTableName}");

谢谢@sequinex和@bendemann
我试过的:
在管道的开头添加了一个笔记本来设置会话;请参阅我的流程中的set 837 env。其目的是,如果不存在具有该appname的会话,它将创建该会话,然后稍后使用它。这样我就把3分钟的启动时间花在了管道的前端,而不是每个文件上。

'''python
from pyspark.sql import SparkSession

# create Spark session with necessary configuration

spark = (SparkSession
    .builder
    .appName("837App")
    .config("spark.network.timeout", "600s")
    .config("spark.executor.heartbeatInterval", "10s")
    .getOrCreate());

sc = spark.sparkContext;

我无法证明它确实在使用这个appname(所以如果有人也能帮忙的话)。
我试过:

'''python
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("837App").getOrCreate()
sc

结果:

<SparkContext master=yarn appName=Synapse_sparkPrimary_1618935780>'

appname=837app不应该吗?
我还尝试停止现有会话并启动我的会话

'''python
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
sc.stop()
spark = SparkSession.builder.appName("837App").getOrCreate()
sc

但我有以下错误:

Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Promise already completed.
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
    at scala.concurrent.Promise$class.success(Promise.scala:86)
    at scala.concurrent.impl.Promise$DefaultPromise.success(Promise.scala:157)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$sparkContextInitialized(ApplicationMaster.scala:392)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.sparkContextInitialized(ApplicationMaster.scala:808)
    at org.apache.spark.scheduler.cluster.YarnClusterScheduler.postStartHook(YarnClusterScheduler.scala:32)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:566)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:238)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

Traceback (most recent call last):

  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 173, in getOrCreate
    sc = SparkContext.getOrCreate(sparkConf)

  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in getOrCreate
    SparkContext(conf=conf or SparkConf())

  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
    conf, jsc, profiler_cls)

  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 198, in _do_init
    self._jsc = jsc or self._initialize_context(self._conf._jconf)

  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 306, in _initialize_context
    return self._jvm.JavaSparkContext(jconf)

  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__
    answer, self._gateway_client, None, self._fqn)

  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
    return f(*a,**kw)

  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)

py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Promise already completed.
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
    at scala.concurrent.Promise$class.success(Promise.scala:86)
    at scala.concurrent.impl.Promise$DefaultPromise.success(Promise.scala:157)
    at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$sparkContextInitialized(ApplicationMaster.scala:392)
    at org.apache.spark.deploy.yarn.ApplicationMaster$.sparkContextInitialized(ApplicationMaster.scala:808)
    at org.apache.spark.scheduler.cluster.YarnClusterScheduler.postStartHook(YarnClusterScheduler.scala:32)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:566)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:238)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

短暂性脑缺血发作

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题