我想对hadoop集群中存储的数据运行java工具。我试着使用sparkyr的spark\u apply函数来实现它,但是我对语法有点困惑。
在运行spark代码之前,我已经按照以下说明设置了conda环境:http://blog.cloudera.com/blog/2017/09/how-to-distribute-your-r-code-with-sparklyr-and-cdsw/ . 我无法访问包裹,因此需要使用本文中描述的第二个选项。conda环境还包含我想要使用的java工具。
以虹膜数据为例:
library(sparklyr)
library(tidyverse)
library(datasets)
data(iris)
config <- spark_config()
config[["spark.r.command"]] <- "./r_env.zip/r_env/bin/Rscript"
config[["spark.yarn.dist.archives"]] <- "r_env.zip"
config$sparklyr.apply.env.R_HOME <- "./r_env.zip/r_env/lib/R"
config$sparklyr.apply.env.RHOME <- "./r_env.zip/r_env"
config$sparklyr.apply.env.R_SHARE_DIR <- "./r_env.zip/r_env/lib/R/share"
config$sparklyr.apply.env.R_INCLUDE_DIR <- "./r_env.zip/r_env/lib/R/include"
sc <- spark_connect(master = "yarn-client", config = config)
# Write iris table to HDFS, partitioning by Species
iris_tbl_tmp = copy_to(sc, iris, overwrite=T)
spark_write_table(iris_tbl_tmp, "iris_byspecies", partition_by="Species")
iris_tbl = sc %>% tbl("iris_byspecies")
iris_tbl
由于java工具无法从hdfs读取数据,我实际上必须将每个数据集保存到一个文件中,运行java工具,然后再次读取数据:
myfunction = function(x) {
write.table(x, "tempfile.txt")
system2("{PATH}/myjavatool.java")
res = read.table("output_of_java_command.txt")
res
}
myoutput = spark_apply(iris_tbl, myfunction, group_by=Species)
我的问题是关于java工具的路径。我如何才能看到Sparkyr存储conda环境的位置?
此外,有没有更简单的方法?
2条答案
按热度按时间wnavrhmk1#
根据[Yarn上的Spark]https://spark.apache.org/docs/latest/running-on-yarn.html()指南,
spark.yarn.dist.archives
:要提取到每个执行者的工作目录中的以逗号分隔的档案列表。
所以这些文件应该就在你的应用程序的工作目录中。
ki0zmccv2#
你需要打电话
sparklyr::spark_apply
与packages = FALSE
,这意味着sparkyr::spark\u apply将使用您的存档包(r\u env.zip)而不是.libPaths()