这个问题已经存在:
jupyter笔记本-名称错误:未定义名称[已关闭]
上个月关门了。
在jupyter笔记本上,我有几行pyspark代码。
当我第一次执行单元格时,它会工作。一段时间后,如果我再次尝试执行相同的单元格,我会得到 NameError
. 请帮忙。
仅供参考:在相同的环境中,另一个具有相同代码的计划作业每5分钟执行一次。
例如。,
df.show(5,False)
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
<ipython-input-2-91a6331f23c0> in <module>
----> 1 df.show(5,False)
NameError: name 'df' is not defined
我的代码如下:
import findspark
findspark.init()
from functools import reduce
import time, datetime, argparse,math,configparser
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext,SparkSession,Window
from pyspark.sql.functions import split,lit,to_utc_timestamp,hour,dayofweek
from pyspark.sql.types import TimestampType,IntegerType,StringType
from datetime import timedelta
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
sc_shil = SparkContext(appName="Test_Shil")
sc_shil.setLogLevel("WARN")
sqlCtx_shil = SQLContext(sc_shil)
spark_shil = SparkSession.builder.appName("Test_Shil").getOrCreate()
sqlCtx_shil = spark_shil
es_server_ip = "es_srv"
es_server_port = "9200"
es_conn = Elasticsearch("http://user:password@es_srv:9200",use_ssl=True,verify_certs=False)
timestamp = {"timestamp": datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")}
to_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
from_timestamp = to_timestamp-timedelta(minutes=5)
def get_raw_data(sqlCtx_shil,current_index,from_timestamp,to_timestamp):
q_raw = '''
{
"query": {
"bool": {
"filter": [
{
"range": {
"message.dat": {
"gte": "''' + from_timestamp +'''",
"lt": "''' + to_timestamp + '''"
}
}
}
]
}
}
}
'''
print("Raw data query",q_raw)
df = sqlCtx_shil.read.format("org.elasticsearch.spark.sql") \
.option("es.nodes", es_server_ip) \
.option("es.port",es_server_port) \
.option("es.read.metadata", "false") \
.option("es.mapping.date.rich", "false") \
.option("es.read.field.include","message.dat,message.max,message.value") \
.option("es.query", q_raw) \
.option("es.net.http.auth.user", "user") \
.option("es.net.http.auth.pass", "password") \
.option("es.net.ssl","true")\
.option("es.net.ssl.cert.allow.self.signed","true")\
.load(current_index)
return df
df = get_raw_data(sqlCtx_shil,current_index,from_timestamp,to_timestamp)
df.show(5, False)
暂无答案!
目前还没有任何答案,快来回答吧!