每当我运行时,就会显示此错误:
Traceback (most recent call last):
File "~/test-tung/spark_tf.py", line 69, in <module>
'spark_tf').master('yarn').getOrCreate()
File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/session.py", line 186, in getOrCreate
File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/context.py", line 371, in getOrCreate
File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/context.py", line 131, in __init__
File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/context.py", line 193, in _do_init
File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/context.py", line 310, in _initialize_context
File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1569, in __call__
File "~/main-projects/spark/spark-3.0.0-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.NoClassDefFoundError: org/spark_project/guava/base/Preconditions
我的python应用程序的一部分 spark_tf.py
:
spark = SparkSession.builder.appName(
'spark_tf').master('yarn').getOrCreate()
model = tf.keras.models.load_model('./model/kdd_binary.h5')
weights = model.get_weights()
config = model.get_config()
bc_weights = spark.sparkContext.broadcast(weights)
bc_config = spark.sparkContext.broadcast(config)
scheme = StructType().add('@timestamp', StringType()).add('@address', StringType())
stream = spark.readStream.format('kafka') \
.option('kafka.bootstrap.servers', 'my-host:9092') \
.option('subscribe', 'dltest') \
.load() \
.selectExpr("CAST(value AS STRING)") \
.select(from_json('value', scheme).alias('json'),
online_predict('value').alias('result')) \
.select(to_json(struct('result', 'json.@timestamp', 'json.@address'))
.alias('value'))
x = stream.writeStream \
.format('kafka') \
.option("kafka.bootstrap.servers", 'my-host:9092') \
.option('topic', 'dlpred') \
.option('checkpointLocation', './kafka_checkpoint') \
.start()
x.awaitTermination()
我的提交行:spark submit——部署模式客户机——packagesorg.apache。spark:spark-sql-kafka-0-10_2.12:3.0.0Spark塞
我想可能是因为Spark设置不当,但我不知道是什么原因造成的。
edit:我认为这段代码显然是在客户端而不是hadoop集群上运行的,但是在集群上运行它会产生相同的错误。
暂无答案!
目前还没有任何答案,快来回答吧!