从脚本导入带有spark上下文的pyspark函数

xmq68pz9  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(350)

我正在从一个名为“etldatalake.py”的脚本中导入一个名为“ersvr”的函数。etldatalake.py脚本保存在s3中。我也用类似的方法导入了脚本和其他函数,它们在我的代码中工作得很好,但是当我导入这个函数时,出现了下面的错误。有人能告诉我问题是什么,怎么解决吗?我在aws上的sagemaker笔记本上运行这个代码,但我觉得这更像是一个Spark问题。
脚本代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

# updated 11/19/19 for error caused in error logging function

spark = glueContext.spark_session

from pyspark.sql import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import first
from pyspark.sql.functions  import date_format
from pyspark.sql.functions import lit,StringType
from pyspark.sql.types import *
from pyspark.sql.functions import to_date, substring, length, min,when,format_number,dayofmonth,hour,dayofyear,month,year,weekofyear,date_format,unix_timestamp
import time
import math
import datetime
from pyspark.sql.functions import UserDefinedFunction

# testing

base_path='s3://example/'

# importing script with code

sc.addPyFile(base_path+'etldatalake/etldatalake.py')
from etldatalake import *

# running function

erSvr(pth=base_path+'error_logs/test/',err=e,nm='5_overwrite')

# function code in etldatalake.py:

def erSvr(pth,err,nm):

    # updated 11/11/19
    # adding current date to filepath for error logs
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext

    glueContext = GlueContext(SparkContext.getOrCreate())
    spark = glueContext.spark_session

    dateFormat = "%Y%m%d_%H%M"
    import datetime
    ts=spark.sql(""" select current_timestamp() as ctime """).collect()[0]["ctime"]

    tdydate=ts.strftime(dateFormat)

    log_txt = glueContext.create_dynamic_frame_from_options("s3", {'paths': [pth] }, format="csv" )

    log_txt_df=log_txt.toDF()

    log_txt_df=log_txt_df.withColumn('error',lit(str(err)))

    log_txt_df.write.csv(pth+tdydate+'/'+nm,sep='\t')

# error:

An error was encountered:
name 'spark' is not defined
Traceback (most recent call last):
  File "/mnt/yarn/usercache/livy/appcache/application_1588629995174_0030/spark-bea7bc4e-cc37-496c-8c6b-c8182ec46f3b/userFiles-a7937998-0b7c-440d-8a94-d7ad98e064b1/etldatalake.py", line 61, in erSvr
    ts=spark.sql(""" select current_timestamp() as ctime """).collect()[0]["ctime"]
NameError: name 'spark' is not defined

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题