如何在spark notebook中从accumulo 1.6创建spark rdd?

0yg35tkg  于 2021-05-30  发布在  Hadoop
关注(0)|答案(2)|浏览(398)

我有一个流浪的形象与Spark笔记本,Spark,Accumulo1.6,和hadoop都在运行。在notebook中,我可以手动创建扫描仪,并从使用accumulo示例之一创建的表中提取测试数据:

val instanceNameS = "accumulo"
val zooServersS = "localhost:2181"
val instance: Instance = new ZooKeeperInstance(instanceNameS, zooServersS)
val connector: Connector = instance.getConnector( "root", new PasswordToken("password"))
val auths = new Authorizations("exampleVis")
val scanner = connector.createScanner("batchtest1", auths)

scanner.setRange(new Range("row_0000000000", "row_0000000010"))

for(entry: Entry[Key, Value] <- scanner) {
  println(entry.getKey + " is " + entry.getValue)
}

将给出表数据的前十行。
因此,当我尝试创建rdd时:

val rdd2 = 
  sparkContext.newAPIHadoopRDD (
    new Configuration(), 
    classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value]
  )

由于以下错误,我得到了一个无法处理的rdd:
java.io.ioexception:尚未设置输入信息。在org.apache.acumulo.core.client.mapreduce.lib.impl.inputconfigurator.validateoptions(inputconfigurator。java:630)在org.apache.acumulo.core.client.mapreduce.abstractinputformat.validateoptions(abstractinputformat。java:343)位于org.apache.acumulo.core.client.mapreduce.abstractinputformat.getsplits(abstractinputformat.com)。java:538)在org.apache.spark.rdd.newhadooprdd.getpartitions(newhadooprdd)。scala:98)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd。scala:222)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd。scala:220)在scala.option.getorelse(option。scala:120)在org.apache.spark.rdd.rdd.partitions(rdd。scala:220)在org.apache.spark.sparkcontext.runjob(sparkcontext。scala:1367)在org.apache.spark.rdd.rdd.count(rdd。scala:927)
这完全是有意义的,因为我没有指定任何参数来连接哪个表,身份验证是什么,等等。
所以我的问题是:从这里开始,我需要做什么才能将表数据的前十行放入rdd?
更新一个仍然不起作用,但我确实发现了一些事情。原来有两个几乎相同的包裹,
org.apache.acumulo.core.client.mapreduce下载
&
org.apache.acumulo.core.client.mapred
两者的成员几乎相同,只是有些方法签名不同。我不知道为什么两者都存在,因为我看不到任何反对意见。我试图毫无喜悦地实现西泽的回答。以下是我所做的,以及我的回答:

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
val jobConf = new JobConf(new Configuration)

import org.apache.hadoop.mapred.jobconf import org.apache.hadoop.conf.configuration jobconf:org.apache.hadoop.mapred.jobconf=配置:core-default.xml,core-site.xml,mapred-default.xml,mapred-site.xml,yarn-default.xml,yarn-site.xml
配置:core-default.xml、core-site.xml、mapred-default.xml、mapred-site.xml、yarn-default.xml、yarn-site.xml

AbstractInputFormat.setConnectorInfo(jobConf, 
                                     "root", 
                                     new PasswordToken("password")

AbstractInputFormat.setScanAuthorizations(jobConf, auths)

AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)

val rdd2 = 
  sparkContext.hadoopRDD (
    jobConf, 
    classOf[org.apache.accumulo.core.client.mapred.AccumuloInputFormat], 
    classOf[org.apache.accumulo.core.data.Key], 
    classOf[org.apache.accumulo.core.data.Value], 
    1
  )

rdd2:org.apache.spark.rdd.rdd[(org.apache.accumulo.core.data.key,org.apache.accumulo.core.data.value)]=hadooprdd处的hadooprdd[1]hadooprdd at:62

rdd2.first

java.io.ioexception:尚未设置输入信息。在org.apache.acumulo.core.client.mapreduce.lib.impl.inputconfigurator.validateoptions(inputconfigurator。java:630)在org.apache.acumulo.core.client.mapred.abstractinputformat.validateoptions(abstractinputformat。java:308)在org.apache.accumulo.core.client.mapred.abstractinputformat.getsplits(abstractinputformat.org.apache.accumulo.core.client.mapred.abstractinputformat.getsplits)上。java:505)在org.apache.spark.rdd.hadooprdd.getpartitions(hadooprdd。scala:201)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd。scala:222)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd。scala:220)在scala.option.getorelse(option。scala:120)在org.apache.spark.rdd.rdd.partitions(rdd。scala:220)在org.apache.spark.rdd.rdd.take(rdd。scala:1077)在org.apache.spark.rdd.rdd.first(rdd。scala:1110)在$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc。(:64)$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc。(:69)在。。。

  • 编辑2*

回复:霍尔顿的回答-仍然没有欢乐:

AbstractInputFormat.setConnectorInfo(jobConf, 
                                         "root", 
                                         new PasswordToken("password")
    AbstractInputFormat.setScanAuthorizations(jobConf, auths)
    AbstractInputFormat.setZooKeeperInstance(jobConf, new ClientConfiguration)
    InputFormatBase.setInputTableName(jobConf, "batchtest1")
    val rddX = sparkContext.newAPIHadoopRDD(
      jobConf, 
      classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
      classOf[org.apache.accumulo.core.data.Key], 
      classOf[org.apache.accumulo.core.data.Value]
      )

rddx:org.apache.spark.rdd.rdd[(org.apache.accumulo.core.data.key,org.apache.accumulo.core.data.value)]=newhadooprdd[0]位于newapihadooprdd:58
out[15]:newhadooprdd[0]位于newapihadooprdd:58

rddX.first

java.io.ioexception:尚未设置输入信息。在org.apache.acumulo.core.client.mapreduce.lib.impl.inputconfigurator.validateoptions(inputconfigurator。java:630)在org.apache.acumulo.core.client.mapreduce.abstractinputformat.validateoptions(abstractinputformat。java:343)位于org.apache.acumulo.core.client.mapreduce.abstractinputformat.getsplits(abstractinputformat.com)。java:538)在org.apache.spark.rdd.newhadooprdd.getpartitions(newhadooprdd)。scala:98)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd。scala:222)在org.apache.spark.rdd.rdd$$anonfun$partitions$2.apply(rdd。scala:220)在scala.option.getorelse(option。scala:120)在org.apache.spark.rdd.rdd.partitions(rdd。scala:220)在org.apache.spark.rdd.rdd.take(rdd。scala:1077)在org.apache.spark.rdd.rdd.first(rdd。scala:1110)在$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc$$iwc。(:61)
编辑3——进度!
我能找出为什么'输入信息未设置'错误发生。你们中间的鹰眼无疑会看到下面的代码缺少一个结束符'('

AbstractInputFormat.setConnectorInfo(jobConf, "root", new PasswordToken("password")

当我在spark笔记本上做这件事时,我一直在点击execute按钮继续前进,因为我没有看到错误。我忘了笔记本会像spark shell那样做,当你去掉一个结束语')'——它会永远等着你添加它。因此,该错误是“setconnectorinfo”方法从未执行的结果。
不幸的是,我仍然无法将accumulo表数据放入对我有用的rdd中。当我执行

rddX.count

我回来了
res15:长=10000
这是正确的回答-我所指的表中有10000行数据。然而,当我试图抓住数据的第一个元素时:

rddX.first

我得到以下错误:
org.apache.spark.sparkexception:由于阶段失败而中止作业:阶段0.0(tid 0)中的任务0.0具有不可序列化的结果:org.apache.accumulo.core.data.key
你有什么想法吗?
编辑4——成功!
接受的答案+注解占90%——除了acumulo键/值需要转换成可序列化的内容。我通过对这两个函数调用.tostring()方法来实现这一点。我会尽快发布一些完整的工作代码,以防其他人遇到同样的问题。

vuv7lop3

vuv7lop31#

这些参数似乎必须通过静态方法设置:http://accumulo.apache.org/1.6/apidocs/org/apache/accumulo/core/client/mapred/accumuloinputformat.html. 因此,请尝试设置非可选参数并再次运行。应该有用。

s2j5cfk0

s2j5cfk02#

通常,对于自定义hadoop输入格式,使用jobconf指定信息。正如@sietse所指出的,在acumuloinputformat上有一些静态方法可以用来配置jobconf。在这种情况下,我想你应该做的是:

val jobConf = new JobConf() // Create a job conf
// Configure the job conf with our accumulo properties
AccumuloInputFormat.setConnectorInfo(jobConf, principal, token)
AccumuloInputFormat.setScanAuthorizations(jobConf, authorizations)
val clientConfig =  new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)
AccumuloInputFormat.setZooKeeperInstance(jobConf, clientConfig)
AccumuloInputFormat.setInputTableName(jobConf, tableName)
// Create an RDD using the jobConf
val rdd2 = sc.newAPIHadoopRDD(jobConf, 
classOf[org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat], 
classOf[org.apache.accumulo.core.data.Key], 
classOf[org.apache.accumulo.core.data.Value]
)

注意:在深入研究代码之后,is configured属性似乎部分是基于所调用的类设置的(这对于避免与其他包的潜在冲突是有意义的),因此当我们稍后在具体类中获取它时,它找不到is configured标志。解决方法是不使用抽象类。看到了吗https://github.com/apache/accumulo/blob/bf102d0711103e903afa0589500f5796ad51c366/core/src/main/java/org/apache/accumulo/core/client/mapreduce/lib/impl/configuratorbase.java#l127 实施细节)。如果您不能在spark笔记本的具体实现上调用此方法,那么使用spark shell或定期构建的应用程序可能是最简单的解决方案。

相关问题