spark序列化错误

d6kp6zgx  于 2021-06-09  发布在  Hbase
关注(0)|答案(2)|浏览(297)

我正在努力学习spark+scala。我想阅读hbase,但没有mapreduce。我创建了一个简单的hbase表-“test”,并在其中放置了3个put。我想通过spark阅读它(没有使用mapreduce的hbasetest)。我尝试在shell上运行以下命令

val numbers = Array(
  new Get(Bytes.toBytes("row1")), 
  new Get(Bytes.toBytes("row2")), 
  new Get(Bytes.toBytes("row3")))
val conf = new HBaseConfiguration()
val table = new HTable(conf, "test")
sc.parallelize(numbers, numbers.length).map(table.get).count()

我不断收到错误-org.apache.spark.sparkeexception:作业中止:任务不可序列化:java.io.notserializableeexception:org.apache.hadoop.hbase.hbaseconfiguration
有人能帮我吗,我怎样才能创建一个使用可序列化配置的htable
谢谢

nqwrtyyt

nqwrtyyt1#

你的问题是 table 不可序列化(而是它的成员 conf )你试图在一个 map . 他们认为您尝试读取hbase的方式不太正确,看起来您尝试了一些特定的get,然后尝试并行地执行它们。即使你真的做到了这一点,这真的不会扩展为你要执行随机读取。您要做的是使用spark执行表扫描,下面是一段代码片段,可以帮助您完成此操作:

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)

sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])

这将为您提供一个rdd,其中包含构成行的naviagablemap。下面是如何将naviagblemap更改为字符串的普通scalaMap:

...
.map(kv => (kv._1.get(), navMapToMap(kv._2.getMap)))
.map(kv => (Bytes.toString(kv._1), rowToStrMap(kv._2)))

def navMapToMap(navMap: HBaseRow): CFTimeseriesRow =
  navMap.asScala.toMap.map(cf =>
    (cf._1, cf._2.asScala.toMap.map(col =>
      (col._1, col._2.asScala.toMap.map(elem => (elem._1.toLong, elem._2))))))

def rowToStrMap(navMap: CFTimeseriesRow): CFTimeseriesRowStr =
  navMap.map(cf =>
    (Bytes.toString(cf._1), cf._2.map(col =>
      (Bytes.toString(col._1), col._2.map(elem => (elem._1, Bytes.toString(elem._2)))))))

最后一点,如果您真的想并行执行随机读取,我相信您可以将hbase表初始化放在 map .

eoigrqb6

eoigrqb62#

当你这么做的时候会发生什么
@transient val conf=新hbaseconfiguration
显然,hbase提交任务的其他部分也不可序列化。每一个问题都需要解决。
考虑实体在连接的两边是否具有相同的含义/语义。任何联系都不会。不应序列化hbaseconfiguration。但是基本体和构建在基本体之上的简单对象(不包含上下文敏感数据)可以包含在序列化中
对于上下文敏感的实体(包括hbaseconfiguration和任何面向连接的数据结构),应将它们标记为@transient,然后在readobject()方法中,应使用与客户端环境相关的值示例化它们。

相关问题