在hadoop中使用spark\u apply从sparkyr运行系统命令

enxuqcxy  于 2021-06-01  发布在  Hadoop
关注(0)|答案(2)|浏览(365)

我想对hadoop集群中存储的数据运行java工具。我试着使用sparkyr的spark\u apply函数来实现它,但是我对语法有点困惑。
在运行spark代码之前,我已经按照以下说明设置了conda环境:http://blog.cloudera.com/blog/2017/09/how-to-distribute-your-r-code-with-sparklyr-and-cdsw/ . 我无法访问包裹,因此需要使用本文中描述的第二个选项。conda环境还包含我想要使用的java工具。
以虹膜数据为例:

library(sparklyr)
library(tidyverse)
library(datasets)
data(iris)
config <- spark_config()
config[["spark.r.command"]] <- "./r_env.zip/r_env/bin/Rscript"
config[["spark.yarn.dist.archives"]] <- "r_env.zip"
config$sparklyr.apply.env.R_HOME <- "./r_env.zip/r_env/lib/R"
config$sparklyr.apply.env.RHOME <- "./r_env.zip/r_env"
config$sparklyr.apply.env.R_SHARE_DIR <- "./r_env.zip/r_env/lib/R/share"
config$sparklyr.apply.env.R_INCLUDE_DIR <- "./r_env.zip/r_env/lib/R/include"
sc <- spark_connect(master = "yarn-client", config = config)

# Write iris table to HDFS, partitioning by Species

iris_tbl_tmp = copy_to(sc, iris, overwrite=T)
spark_write_table(iris_tbl_tmp, "iris_byspecies", partition_by="Species")
iris_tbl = sc %>% tbl("iris_byspecies")
iris_tbl

由于java工具无法从hdfs读取数据,我实际上必须将每个数据集保存到一个文件中,运行java工具,然后再次读取数据:

myfunction = function(x) { 
    write.table(x, "tempfile.txt")
    system2("{PATH}/myjavatool.java")
    res = read.table("output_of_java_command.txt")
    res
}
myoutput = spark_apply(iris_tbl, myfunction, group_by=Species)

我的问题是关于java工具的路径。我如何才能看到Sparkyr存储conda环境的位置?
此外,有没有更简单的方法?

wnavrhmk

wnavrhmk1#

根据[Yarn上的Spark]https://spark.apache.org/docs/latest/running-on-yarn.html()指南, spark.yarn.dist.archives :
要提取到每个执行者的工作目录中的以逗号分隔的档案列表。
所以这些文件应该就在你的应用程序的工作目录中。

ki0zmccv

ki0zmccv2#

你需要打电话 sparklyr::spark_applypackages = FALSE ,这意味着sparkyr::spark\u apply将使用您的存档包(r\u env.zip)而不是 .libPaths()

相关问题