jupyter笔记本--nameerror:未定义名称

rbpvctlc  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(614)

这个问题已经存在

jupyter笔记本-名称错误:未定义名称[已关闭]
上个月关门了。
在jupyter笔记本上,我有几行pyspark代码。
当我第一次执行单元格时,它会工作。一段时间后,如果我再次尝试执行相同的单元格,我会得到 NameError . 请帮忙。
仅供参考:在相同的环境中,另一个具有相同代码的计划作业每5分钟执行一次。
例如。,

df.show(5,False)
---------------------------------------------------------------------------
NameError                                 Traceback (most recent call last)
<ipython-input-2-91a6331f23c0> in <module>
----> 1 df.show(5,False)

NameError: name 'df' is not defined

我的代码如下:

import findspark
findspark.init()
from functools import reduce
import time, datetime, argparse,math,configparser
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext,SparkSession,Window
from pyspark.sql.functions import split,lit,to_utc_timestamp,hour,dayofweek
from pyspark.sql.types import TimestampType,IntegerType,StringType
from datetime import timedelta
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError

sc_shil = SparkContext(appName="Test_Shil")
sc_shil.setLogLevel("WARN")
sqlCtx_shil = SQLContext(sc_shil)
spark_shil = SparkSession.builder.appName("Test_Shil").getOrCreate()
sqlCtx_shil = spark_shil

es_server_ip = "es_srv"
es_server_port = "9200"
es_conn = Elasticsearch("http://user:password@es_srv:9200",use_ssl=True,verify_certs=False)

timestamp = {"timestamp": datetime.datetime.today().strftime("%Y-%m-%d %H:%M:%S")}
to_timestamp = datetime.datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S")
from_timestamp = to_timestamp-timedelta(minutes=5)

def get_raw_data(sqlCtx_shil,current_index,from_timestamp,to_timestamp):
    q_raw = '''
    {
        "query": {
            "bool": {
                "filter": [
                    {
                        "range": {
                            "message.dat": {
                                "gte": "''' + from_timestamp +'''",
                                "lt": "''' + to_timestamp + '''"
                                }
                        }
                    }
                ]
            }
        }
    }
    '''
    print("Raw data query",q_raw)
    df  = sqlCtx_shil.read.format("org.elasticsearch.spark.sql") \
        .option("es.nodes", es_server_ip) \
        .option("es.port",es_server_port) \
        .option("es.read.metadata", "false") \
        .option("es.mapping.date.rich", "false") \
        .option("es.read.field.include","message.dat,message.max,message.value") \
        .option("es.query", q_raw) \
        .option("es.net.http.auth.user", "user") \
        .option("es.net.http.auth.pass", "password") \
        .option("es.net.ssl","true")\
        .option("es.net.ssl.cert.allow.self.signed","true")\
        .load(current_index)
    return df

df = get_raw_data(sqlCtx_shil,current_index,from_timestamp,to_timestamp)

df.show(5, False)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题