假设如果直接从hdfs而不是使用hbase api访问数据,我们可以更快地访问数据,我们正在尝试基于hbase的表快照构建rdd。
所以,我有一个快照叫做“dm\u test\u snap”。我似乎能够让大多数配置工作,但我的rdd是空的(尽管快照本身有数据)。
我花了很长时间才找到一个例子,任何人都可以用spark离线分析hbase快照,但我不敢相信只有我一个人在努力让它工作。如有任何帮助或建议,我们将不胜感激。
以下是我的代码片段:
object TestSnap {
def main(args: Array[String]) {
val config = ConfigFactory.load()
val hbaseRootDir = config.getString("hbase.rootdir")
val sparkConf = new SparkConf()
.setAppName("testnsnap")
.setMaster(config.getString("spark.app.master"))
.setJars(SparkContext.jarOfObject(this))
.set("spark.executor.memory", "2g")
.set("spark.default.parallelism", "160")
val sc = new SparkContext(sparkConf)
println("Creating hbase configuration")
val conf = HBaseConfiguration.create()
conf.set("hbase.rootdir", hbaseRootDir)
conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum"))
conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")
val scan = new Scan
val job = Job.getInstance(conf)
TableSnapshotInputFormat.setInput(job, "dm_test_snap",
new Path("hdfs://nameservice1/tmp"))
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
}
更新以包含解决方案技巧是,正如@holden在下面提到的,conf没有通过。为了解决这个问题,我将newapihadooprdd的调用改为:
val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
还有第二个问题,@victor的回答也强调了这一点,那就是我没有通过扫描。为了解决这个问题,我添加了以下行和方法:
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
def convertScanToString(scan : Scan) = {
val proto = ProtobufUtil.toScan(scan);
Base64.encodeBytes(proto.toByteArray());
}
这也让我从conf.set命令中拉出这一行:
conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")
- 注意:这适用于cdh5.0上的hbase版本0.96.1.1
便于参考的最终完整代码:
object TestSnap {
def main(args: Array[String]) {
val config = ConfigFactory.load()
val hbaseRootDir = config.getString("hbase.rootdir")
val sparkConf = new SparkConf()
.setAppName("testnsnap")
.setMaster(config.getString("spark.app.master"))
.setJars(SparkContext.jarOfObject(this))
.set("spark.executor.memory", "2g")
.set("spark.default.parallelism", "160")
val sc = new SparkContext(sparkConf)
println("Creating hbase configuration")
val conf = HBaseConfiguration.create()
conf.set("hbase.rootdir", hbaseRootDir)
conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum"))
conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
val scan = new Scan
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val job = Job.getInstance(conf)
TableSnapshotInputFormat.setInput(job, "dm_test_snap",
new Path("hdfs://nameservice1/tmp"))
val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
def convertScanToString(scan : Scan) = {
val proto = ProtobufUtil.toScan(scan);
Base64.encodeBytes(proto.toByteArray());
}
}
3条答案
按热度按时间csga3l581#
查看作业信息,它会复制您提供给它的conf对象(
The Job makes a copy of the Configuration so that any necessary internal modifications do not reflect on the incoming parameter.
)因此,您需要在conf对象上设置的信息很可能没有传递给spark。你可以用TableSnapshotInputFormatImpl
它有一个类似的方法可以处理conf对象。可能还需要一些额外的东西,但首先通过这个问题,这似乎是最有可能的原因。正如评论中指出的,另一种选择是使用
job.getConfiguration
从作业对象获取更新的配置。ajsxfq5m2#
您尚未正确配置m/r作业:这是java中有关如何通过快照配置m/r的示例:
你,当然,跳过了扫描。我建议您看看tablemapreduceutil的inittablesnapshotmapperjob实现,了解如何在spark/scala中配置job。
apeeds0o3#
下面是mapreducejava中的完整配置