我正在做一个函数来验证pysparkDataframe列上的日期格式。在这里我创建sparksession
spark = SparkSession \
.builder \
.appName("Validations") \
.master("local[*]") \
.getOrCreate()
这就是函数:
def validDate(date, frmt):
try:
datetime.strptime(date, frmt)
return True
except:
return False
def checkDateFormat(column, spkDF, formatChoosen):
dateUDF = udf (lambda x: validDate(str(x), formatChoosen), BooleanType())
temp_df = spkDF.withColumn('check', dateUDF(col(column)))
temp_df = temp_df.filter(temp_df['check'] == 'false')
cols=[str(column)]
DF = temp_df.select(*cols)
dfCount = DF.count()
return DF, dfCount
因此,在checkdateformat中,我传递列名(字符串)、spkdf:spark dataframe和日期格式:“%y-%m-%d%h:%m:%s”。
它创建一个应用validdate的udf函数。
它创建一个新列,用udf函数检查日期,值为真或假
它只过滤错误的
仅选择2列
返回Dataframe及其长度。
当我在jupyter笔记本上运行时,我没有问题,但是当我在conda终端上执行时,我得到以下错误:
Traceback (most recent call last):
File "C:\Users\AKAINIX ANALYTICS\Documents\Lucas\Antarctic\Bitbucket\plataforma-dataquality\DataQuality\API\rest_api_flask.py", line 922, in executeProcess
validations_calc.validate(post_data, db_access)
File "C:\Users\AKAINIX ANALYTICS\Documents\Lucas\Antarctic\Bitbucket\plataforma-dataquality\DataQuality\pyScripts\validations_calc.py", line 897, in validate
DF, dfCount = validateGroup3(rule, col, param, processConf['rulesConf'][rule][conditionPlace]['conditions'], spark_df, df)
File "C:\Users\AKAINIX ANALYTICS\Documents\Lucas\Antarctic\Bitbucket\plataforma-dataquality\DataQuality\pyScripts\validations_calc.py", line 760, in validateGroup3
return checkDateFormat(col, spark_df, param)
File "C:\Users\AKAINIX ANALYTICS\Documents\Lucas\Antarctic\Bitbucket\plataforma-dataquality\DataQuality\pyScripts\validations_calc.py", line 221, in checkDateFormat
dfCount = DF.count()
File "C:\Users\AKAINIX ANALYTICS\anaconda3\lib\site-packages\pyspark\sql\dataframe.py", line 585, in count
return int(self._jdf.count())
File "C:\Users\AKAINIX ANALYTICS\anaconda3\lib\site-packages\py4j\java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\Users\AKAINIX ANALYTICS\anaconda3\lib\site-packages\pyspark\sql\utils.py", line 137, in deco
raise_from(converted)
File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
File "C:\Users\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 589, in main
File "C:\Users\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 447, in read_udfs
File "C:\Users\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 254, in read_single_udf
File "C:\Users\Spark\python\lib\pyspark.zip\pyspark\worker.py", line 74, in read_command
File "C:\Users\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 172, in _read_with_length
return self.loads(obj)
File "C:\Users\Spark\python\lib\pyspark.zip\pyspark\serializers.py", line 458, in loads
return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'validations_calc'
validations\u calc是我的python文件的名称:validations\u calc.py
我在jupyter和conda终端都有相同的Spark版本。首先我尝试使用2.3.0,现在我使用3.0.0
回溯说当我执行df.count()时y失败。我检查行和df是否定义良好,但是当我尝试执行df.show()时,它也失败了。
暂无答案!
目前还没有任何答案,快来回答吧!