我正尝试在虚拟机上运行spark流应用程序(该项目将模拟具有一个节点的群集),方法如下: ./spark-submit --master yarn --deploy-mode cluster --driver-memory 3g /home/bigdata/PycharmProjects/pythonProject/venv/test.py
但当它运行时,我会陷入以下情况:
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1609436829638
final status: UNDEFINED
tracking URL: http://bigdata-VirtualBox:8088/proxy/application_1609416014621_0023/
user: bigdata
20/12/31 18:47:11 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:12 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:13 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:14 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:15 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:16 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:17 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:18 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:19 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:20 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:21 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:22 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:23 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:24 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:25 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:26 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:27 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:28 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:29 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:30 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:31 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:32 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:33 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED)
20/12/31 18:47:34 INFO yarn.Client: Application report for application_1609416014621_0023 (state: ACCEPTED).
它一直持续到4-5分钟,然后它给了我一个错误:
Failing this attempt.Diagnostics: [2020-12-31 18:59:34.338]Exception from container-launch.
Container id: container_1609416014621_0024_02_000001
Exit code: 13
[2020-12-31 18:59:34.340]Container exited with a non-zero exit code 13. Error file: prelaunch.err.
我刚接触apachespark和hadoop,所以我不知道该怎么处理这种情况。下面是我的test.py代码:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
import socket
# Singleton di SparkSession
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
# invio dei dati tramite UDP
def send_df(df,serverPort,UDPClient):
values = [str(t.value) for t in df.select("value").collect()]
str1 = '-'.join(values)
msg=str.encode(str1);
UDPClient.sendto(msg, serverPort)
# informazioni UDP
serverAddressPort = ("127.0.0.1", 3001)
DPClientSocket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
sc = SparkContext("local[2]", "")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
lines = ssc.socketTextStream("localhost", 8000)
splitted_val=lines.flatMap(lambda x:str(x).split('-'));
values=splitted_val.map(lambda x:int(x));
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# ottenimento del Singleton di SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(value=w))
DataFrame = spark.createDataFrame(rowRdd)
# creazione di una vista temporanea
DataFrame.createOrReplaceTempView("values")
# Do word count on table using SQL and print it
valueDataFrame = \
spark.sql("select value from values")
valueDataFrame.show()
send_df(valueDataFrame,serverAddressPort,DPClientSocket);
except:
pass
values.foreachRDD(process)
ssc.start(); # Start the computation
ssc.awaitTermination();
我怎样才能解决这个问题?
暂无答案!
目前还没有任何答案,快来回答吧!