无法从sparkr创建的Dataframe检索数据

azpvetkf  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(319)

我有下面简单的 SparkR 程序,即创建 SparkR DataFrame 并从中检索/收集数据。

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="yarn-client",sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)

n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)

xs

head(xs)
collect(xs)

我能够成功地创建它并查看信息,但是任何与获取数据相关的操作都会抛出以下错误。
16/07/25 16:33:59警告tasksetmanager:在阶段17.0中丢失任务0.3(tid 86,wlos06.nrm.minn.seagate.com):java.net.sockettimeoutexception:accept在java.net.plainsocketimpl.socketaccept(本机方法)处超时,位于java.net.abstractplainsocketimpl.accept(abstractplainsocketimpl)。java:398)在java.net.serversocket.implacept(serversocket。java:530)在java.net.serversocket.accept(serversocket。java:498)在org.apache.spark.api.r.rrdd$.createrworker(rrdd。scala:432)在org.apache.spark.api.r.baserrdd.compute(rrdd。scala:63)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:306)在org.apache.spark.rdd.rdd.iterator(rdd。scala:270)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:38)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:306)在org.apache.spark.rdd.rdd.iterator(rdd。scala:270)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:38)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:306)在org.apache.spark.rdd.rdd.iterator(rdd。scala:270)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:38)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:306)在org.apache.spark.rdd.rdd.iterator(rdd。scala:270)在org.apache.spark.scheduler.resulttask.runtask(resulttask。scala:66)在org.apache.spark.scheduler.task.run(task。scala:89)在org.apache.spark.executor.executor$taskrunner.run(executor。scala:214)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1145)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:615)在java.lang.thread.run(线程。java:745)
16/07/25 16:33:59错误tasksetmanager:阶段17.0中的任务0失败4次;中止作业16/07/25 16:33:59错误rbackendhandler:org.apache.spark.sql.api.r.sqlutils失败invokejava中的错误(isstatic=true,classname,methodname,…):org.apache.spark.sparkeexception:由于阶段失败而中止作业:阶段17.0中的任务0失败了4次,最近的失败:阶段17.0中的任务0.3丢失(tid 86,wlos06.nrm.minn.seagate.com):java.net.sockettimeoutexception:accept在java.net.plainsocketimpl.socketaccept(本机方法)处超时,位于java.net.abstractplainsocketimpl.accept(abstractplainsocketimpl)。java:398)在java.net.serversocket.implacept(serversocket。java:530)在java.net.serversocket.accept(serversocket。java:498)在org.apache.spark.api.r.rrdd$.createrworker(rrdd。scala:432)在org.apache.spark.api.r.baserrdd.compute(rrdd。scala:63)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:306)在org.apache.spark.rdd.rdd.iterator(rdd。scala:270)在org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd。scala:38)在org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd。scala:306)在org.apache.spark.rdd.rdd.iterator(rdd。scala:270)位于org.apache.spark.rdd.mappartitionsrdd.compute(mappar
如果我用下面的sparkr命令行执行它,它就会被执行。

~/Downloads/spark-1.6.1-bin-hadoop2.6/bin/sparkR --master yarn-client

但是当我通过r和sparkr.init((master=“yarn client”)执行它时,它会抛出错误。
有人能帮忙解决这些错误吗?

wqnecbli

wqnecbli1#

添加这一行产生了不同:

Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")

以下是完整代码:

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")
sc <- sparkR.init(sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)

n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)

xs

head(xs)
collect(xs)

相关问题